Lavorare con DataFrame e tables in R
Importante
SparkR in Databricks è deprecato in Databricks Runtime 16.0 e versioni successive. Databricks consiglia di usare sparklyr invece.
Questo articolo descrive come usare pacchetti R come SparkR, sparklyre dplyr per usare R data.frame
s, i dataframe Sparke in memoria tables.
Si noti che quando si lavora con SparkR, sparklyr e dplyr, è possibile completare un'operazione specifica con tutti questi pacchetti ed è possibile usare il pacchetto più comodo. Ad esempio, per eseguire una query, è possibile chiamare funzioni come SparkR::sql
, sparklyr::sdf_sql
e dplyr::select
. In altri casi, potrebbe essere possibile completare un'operazione con solo uno o due di questi pacchetti e l'operazione scelta dipende dallo scenario di utilizzo. Ad esempio, il modo in cui si chiama sparklyr::sdf_quantile
differisce leggermente dal modo in cui si chiama dplyr::percentile_approx
, anche se entrambe le funzioni calcolano i quantili.
È possibile usare SQL come bridge tra SparkR e sparklyr. Ad esempio, puoi usare SparkR::sql
per interrogare tables che hai creato con sparklyr. È possibile utilizzare sparklyr::sdf_sql
per eseguire query su tables che hai creato con SparkR. E il codice dplyr
viene sempre tradotto in SQL in memoria prima che venga eseguito. Vedere anche interoperabilità API e traduzione SQL.
Carica SparkR, sparklyr e dplyr
I pacchetti SparkR, sparklyr e dplyr sono inclusi nel runtime di Databricks installato in Azure Databricks cluster. Pertanto, non è necessario chiamare il normale install.package
prima di poter iniziare a chiamare questi pacchetti. Tuttavia, è comunque necessario caricare questi pacchetti con library
prima. Ad esempio, dall'interno di un notebook R in un'area di lavoro di Azure Databricks, eseguire il codice seguente in una cella del notebook per caricare SparkR, sparklyr e dplyr:
library(SparkR)
library(sparklyr)
library(dplyr)
Connettere sparklyr a un cluster
Dopo aver caricato sparklyr, è necessario chiamare sparklyr::spark_connect
per connettersi al cluster, specificando il metodo di connessione databricks
. Ad esempio, eseguire il codice seguente in una cella del notebook per connettersi al cluster che ospita il notebook:
sc <- spark_connect(method = "databricks")
Al contrario, un notebook di Azure Databricks stabilisce già un SparkSession
nel cluster per l'uso con SparkR, quindi non è necessario chiamare SparkR::sparkR.session
prima di poter iniziare a chiamare SparkR.
Caricare un file di dati JSON nell'area di lavoro
Molti esempi di codice di questo articolo si basano su dati situati in una posizione specifica nel tuo ambiente di lavoro di Azure Databricks, con nomi specifici column e tipi di dati definiti. I dati per questo esempio di codice hanno origine in un file JSON denominato book.json
dall'interno di GitHub. Per get questo file e caricarlo nell'area di lavoro:
- Passare al file books.json in GitHub e usare un editor di testo per copiarne il contenuto in un file denominato
books.json
da qualche parte nel computer locale. - Nella barra laterale dell'area di lavoro di Azure Databricks fare clic su Catalog.
- Fare clic su Crea Table.
- Nella scheda Carica file, rilascia il file
books.json
dal computer locale nella casella Rilascia file per il caricamento. In alternativa, selectfare clic per cercaree selezionare il filebooks.json
dal computer locale.
Per impostazione predefinita, Azure Databricks carica il file
Non fare clic su Crea Table con l'interfaccia utente o su Crea Table in Notebook. Gli esempi di codice in questo articolo utilizzano i dati presenti nel file books.json
caricato in questa posizione DBFS.
Leggere i dati JSON in un DataFrame
Usare sparklyr::spark_read_json
per leggere il file JSON caricato in un dataframe, specificando la connessione, il percorso del file JSON e un nome per la rappresentazione interna table dei dati. Per questo esempio, è necessario specificare che il file book.json
contiene più righe. Specificare il columns’ schema è facoltativo. In caso contrario, sparklyr deduce il columns’ schema per impostazione predefinita. Ad esempio, eseguire il codice seguente in una cella del notebook per leggere i dati del file JSON caricato in un dataframe denominato jsonDF
:
jsonDF <- spark_read_json(
sc = sc,
name = "jsonTable",
path = "/FileStore/tables/books.json",
options = list("multiLine" = TRUE),
columns = c(
author = "character",
country = "character",
imageLink = "character",
language = "character",
link = "character",
pages = "integer",
title = "character",
year = "integer"
)
)
Stampare le prime righe di un dataframe
È possibile usare SparkR::head
, SparkR::show
o sparklyr::collect
per stampare le prime righe di un dataframe. Per impostazione predefinita, head
stampa le prime sei righe.
show
e collect
stampano le prime 10 righe. Ad esempio, eseguire il codice seguente in una cella del notebook per stampare le prime righe del dataframe denominato jsonDF
:
head(jsonDF)
# Source: spark<?> [?? x 8]
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Akk… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid Em… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/Ir… images… Arabic "htt… 288 One … 1200
# … with abbreviated variable names ¹imageLink, ²language
show(jsonDF)
# Source: spark<jsonTable> [?? x 8]
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Ak… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid E… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/I… images… Arabic "htt… 288 One … 1200
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350
# 8 Jane Austen United Kingd… images… English "htt… 226 Prid… 1813
# 9 Honoré de Balzac France images… French "htt… 443 Le P… 1835
# 10 Samuel Beckett Republic of … images… French… "htt… 256 Moll… 1952
# … with more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
collect(jsonDF)
# A tibble: 100 × 8
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Ak… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid E… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/I… images… Arabic "htt… 288 One … 1200
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350
# 8 Jane Austen United Kingd… images… English "htt… 226 Prid… 1813
# 9 Honoré de Balzac France images… French "htt… 443 Le P… 1835
# 10 Samuel Beckett Republic of … images… French… "htt… 256 Moll… 1952
# … with 90 more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
Esegui query SQL e scrivi e leggi da un table
È possibile usare le funzioni dplyr per eseguire query SQL in un dataframe. Ad esempio, eseguire il codice seguente in una cella del notebook per usare dplyr::group_by
e dployr::count
per get conteggi per autore dal dataframe denominato jsonDF
. Usare dplyr::arrange
e dplyr::desc
per ordinare il risultato in ordine decrescente in base ai conteggi. Poi vengono stampate per impostazione predefinita le prime 10 righe.
group_by(jsonDF, author) %>%
count() %>%
arrange(desc(n))
# Source: spark<?> [?? x 2]
# Ordered by: desc(n)
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Gustave Flaubert 2
# 8 Homer 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with more rows
# ℹ Use `print(n = ...)` to see more rows
È quindi possibile usare sparklyr::spark_write_table
per scrivere il risultato in un table in Azure Databricks. Ad esempio, eseguire il codice seguente in una cella del notebook per rieseguire la query e quindi scrivere il risultato in un table denominato json_books_agg
:
group_by(jsonDF, author) %>%
count() %>%
arrange(desc(n)) %>%
spark_write_table(
name = "json_books_agg",
mode = "overwrite"
)
Per verificare che il table sia stato creato, è quindi possibile usare sparklyr::sdf_sql
insieme a SparkR::showDF
per visualizzare i dati del table. Ad esempio, eseguire il codice seguente in una cella del notebook per interrogare il table in un DataFrame e quindi usare sparklyr::collect
per stampare per impostazione predefinita le prime 10 righe del DataFrame.
collect(sdf_sql(sc, "SELECT * FROM json_books_agg"))
# A tibble: 82 × 2
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Homer 2
# 8 Gustave Flaubert 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows
È anche possibile usare sparklyr::spark_read_table
per eseguire operazioni simili. Ad esempio, eseguire il codice seguente in una cella del notebook per interrogare il DataFrame precedente chiamato jsonDF
in un altro DataFrame e quindi usare sparklyr::collect
per stampare per impostazione predefinita le prime 10 righe del DataFrame.
fromTable <- spark_read_table(
sc = sc,
name = "json_books_agg"
)
collect(fromTable)
# A tibble: 82 × 2
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Homer 2
# 8 Gustave Flaubert 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows
Aggiungere columns e calcolare columnvalues in un DataFrame
È possibile usare le funzioni dplyr per aggiungere columns ai dataframe e per calcolare columns' values.
Ad esempio, esegui il codice seguente in una cella del notebook per get il contenuto del DataFrame denominato jsonDF
. Usare dplyr::mutate
per aggiungere un column denominato today
e riempire il nuovo column con il timestamp corrente. Scrivere quindi questi contenuti in un nuovo dataframe denominato withDate
e usare dplyr::collect
per stampare le prime 10 righe del nuovo dataframe per impostazione predefinita.
Nota
dplyr::mutate
accetta solo argomenti conformi alle funzioni integrate di Hive (note anche come UDF) e funzioni di aggregazione integrate (note anche come UDAF). Per informazioni generali, vedere Funzioni Hive. Per informazioni sulle funzioni relative alla data in questa sezione, vedere Funzioni data.
withDate <- jsonDF %>%
mutate(today = current_timestamp())
collect(withDate)
# A tibble: 100 × 9
# author country image…¹ langu…² link pages title year today
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int> <dttm>
# 1 Chinua A… Nigeria images… English "htt… 209 Thin… 1958 2022-09-27 21:32:59
# 2 Hans Chr… Denmark images… Danish "htt… 784 Fair… 1836 2022-09-27 21:32:59
# 3 Dante Al… Italy images… Italian "htt… 928 The … 1315 2022-09-27 21:32:59
# 4 Unknown Sumer … images… Akkadi… "htt… 160 The … -1700 2022-09-27 21:32:59
# 5 Unknown Achaem… images… Hebrew "htt… 176 The … -600 2022-09-27 21:32:59
# 6 Unknown India/… images… Arabic "htt… 288 One … 1200 2022-09-27 21:32:59
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350 2022-09-27 21:32:59
# 8 Jane Aus… United… images… English "htt… 226 Prid… 1813 2022-09-27 21:32:59
# 9 Honoré d… France images… French "htt… 443 Le P… 1835 2022-09-27 21:32:59
# 10 Samuel B… Republ… images… French… "htt… 256 Moll… 1952 2022-09-27 21:32:59
# … with 90 more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
Usare ora dplyr::mutate
per aggiungere altre due columns al contenuto del dataframe withDate
. Il nuovo month
e il year
columns contengono il mese numerico e l'anno del today
column. Scrivere quindi questi contenuti in un nuovo DataFrame denominato withMMyyyy
e usare dplyr::select
insieme a dplyr::collect
per stampare author
, title
, month
e year
columns delle prime dieci righe del nuovo DataFrame di default:
withMMyyyy <- withDate %>%
mutate(month = month(today),
year = year(today))
collect(select(withMMyyyy, c("author", "title", "month", "year")))
# A tibble: 100 × 4
# author title month year
# <chr> <chr> <int> <int>
# 1 Chinua Achebe Things Fall Apart 9 2022
# 2 Hans Christian Andersen Fairy tales 9 2022
# 3 Dante Alighieri The Divine Comedy 9 2022
# 4 Unknown The Epic Of Gilgamesh 9 2022
# 5 Unknown The Book Of Job 9 2022
# 6 Unknown One Thousand and One Nights 9 2022
# 7 Unknown Njál's Saga 9 2022
# 8 Jane Austen Pride and Prejudice 9 2022
# 9 Honoré de Balzac Le Père Goriot 9 2022
# 10 Samuel Beckett Molloy, Malone Dies, The Unnamable, the … 9 2022
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows
Usare ora dplyr::mutate
per aggiungere altre due columns al contenuto del dataframe withMMyyyy
. Il nuovo formatted_date
columns contiene la parte yyyy-MM-dd
del today
column, mentre il nuovo day
column contiene il giorno numerico del nuovo formatted_date
column. Scrivere quindi questi contenuti in un nuovo DataFrame denominato withUnixTimestamp
e usare dplyr::select
insieme a dplyr::collect
per stampare il title
, formatted_date
e day
columns delle prime dieci righe del nuovo DataFrame di default:
withUnixTimestamp <- withMMyyyy %>%
mutate(formatted_date = date_format(today, "yyyy-MM-dd"),
day = dayofmonth(formatted_date))
collect(select(withUnixTimestamp, c("title", "formatted_date", "day")))
# A tibble: 100 × 3
# title formatted_date day
# <chr> <chr> <int>
# 1 Things Fall Apart 2022-09-27 27
# 2 Fairy tales 2022-09-27 27
# 3 The Divine Comedy 2022-09-27 27
# 4 The Epic Of Gilgamesh 2022-09-27 27
# 5 The Book Of Job 2022-09-27 27
# 6 One Thousand and One Nights 2022-09-27 27
# 7 Njál's Saga 2022-09-27 27
# 8 Pride and Prejudice 2022-09-27 27
# 9 Le Père Goriot 2022-09-27 27
# 10 Molloy, Malone Dies, The Unnamable, the trilogy 2022-09-27 27
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows
Creare una visualizzazione temporanea
È possibile creare views temporanei denominati in memoria basati su dataframe esistenti. Ad esempio, eseguire il codice seguente in una cella del notebook per usare SparkR::createOrReplaceTempView
per get il contenuto del dataframe precedente denominato jsonTable
e impostarne una visualizzazione temporanea denominata timestampTable
. Usare quindi sparklyr::spark_read_table
per leggere il contenuto della visualizzazione temporanea. Utilizzare sparklyr::collect
per stampare per impostazione predefinita le prime 10 righe del table temporaneo.
createOrReplaceTempView(withTimestampDF, viewName = "timestampTable")
spark_read_table(
sc = sc,
name = "timestampTable"
) %>% collect()
# A tibble: 100 × 10
# author country image…¹ langu…² link pages title year today
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int> <dttm>
# 1 Chinua A… Nigeria images… English "htt… 209 Thin… 1958 2022-09-27 21:11:56
# 2 Hans Chr… Denmark images… Danish "htt… 784 Fair… 1836 2022-09-27 21:11:56
# 3 Dante Al… Italy images… Italian "htt… 928 The … 1315 2022-09-27 21:11:56
# 4 Unknown Sumer … images… Akkadi… "htt… 160 The … -1700 2022-09-27 21:11:56
# 5 Unknown Achaem… images… Hebrew "htt… 176 The … -600 2022-09-27 21:11:56
# 6 Unknown India/… images… Arabic "htt… 288 One … 1200 2022-09-27 21:11:56
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350 2022-09-27 21:11:56
# 8 Jane Aus… United… images… English "htt… 226 Prid… 1813 2022-09-27 21:11:56
# 9 Honoré d… France images… French "htt… 443 Le P… 1835 2022-09-27 21:11:56
# 10 Samuel B… Republ… images… French… "htt… 256 Moll… 1952 2022-09-27 21:11:56
# … with 90 more rows, 1 more variable: month <chr>, and abbreviated variable
# names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows, and `colnames()` to see all variable names
Eseguire un'analisi statistica su un dataframe
È possibile usare sparklyr insieme a dplyr per le analisi statistiche.
Ad esempio, creare un dataframe in cui eseguire le statistiche. A tale scopo, eseguire il codice seguente in una cella del notebook per usare sparklyr::sdf_copy_to
per scrivere il contenuto del set di dati iris
integrato in R in un dataframe denominato iris
. Utilizzare sparklyr::sdf_collect
per stampare per default le prime 10 righe del table temporaneo.
irisDF <- sdf_copy_to(
sc = sc,
x = iris,
name = "iris",
overwrite = TRUE
)
sdf_collect(irisDF, "row-wise")
# A tibble: 150 × 5
# Sepal_Length Sepal_Width Petal_Length Petal_Width Species
# <dbl> <dbl> <dbl> <dbl> <chr>
# 1 5.1 3.5 1.4 0.2 setosa
# 2 4.9 3 1.4 0.2 setosa
# 3 4.7 3.2 1.3 0.2 setosa
# 4 4.6 3.1 1.5 0.2 setosa
# 5 5 3.6 1.4 0.2 setosa
# 6 5.4 3.9 1.7 0.4 setosa
# 7 4.6 3.4 1.4 0.3 setosa
# 8 5 3.4 1.5 0.2 setosa
# 9 4.4 2.9 1.4 0.2 setosa
# 10 4.9 3.1 1.5 0.1 setosa
# … with 140 more rows
# ℹ Use `print(n = ...)` to see more rows
Usare ora dplyr::group_by
per raggruppare le righe in base al Species
column. Usare dplyr::summarize
insieme a dplyr::percentile_approx
per calcolare le statistiche di riepilogo in base al 25°, 50°, 75° e 100° quantile del Sepal_Length
column da Species
. Usare sparklyr::collect
per stampare i risultati:
Nota
dplyr::summarize
accetta solo argomenti conformi alle funzioni incorporate di Hive (note anche come UDFs) e funzioni di aggregazione incorporate (note anche come UDAFs). Per informazioni generali, vedere Funzioni Hive. Per informazioni su percentile_approx
, vedere funzioni di aggregazione integrate (UDAF).
quantileDF <- irisDF %>%
group_by(Species) %>%
summarize(
quantile_25th = percentile_approx(
Sepal_Length,
0.25
),
quantile_50th = percentile_approx(
Sepal_Length,
0.50
),
quantile_75th = percentile_approx(
Sepal_Length,
0.75
),
quantile_100th = percentile_approx(
Sepal_Length,
1.0
)
)
collect(quantileDF)
# A tibble: 3 × 5
# Species quantile_25th quantile_50th quantile_75th quantile_100th
# <chr> <dbl> <dbl> <dbl> <dbl>
# 1 virginica 6.2 6.5 6.9 7.9
# 2 versicolor 5.6 5.9 6.3 7
# 3 setosa 4.8 5 5.2 5.8
È possibile calcolare risultati simili, ad esempio usando sparklyr::sdf_quantile
:
print(sdf_quantile(
x = irisDF %>%
filter(Species == "virginica"),
column = "Sepal_Length",
probabilities = c(0.25, 0.5, 0.75, 1.0)
))
# 25% 50% 75% 100%
# 6.2 6.5 6.9 7.9