SparkR kullanma
SparkR , R'den Apache Spark'ı kullanmak için hafif bir ön uç sağlayan bir R paketidir. SparkR, seçim, filtreleme, toplama gibi işlemleri destekleyen dağıtılmış bir veri çerçevesi uygulaması sağlar. SparkR, MLlib kullanarak dağıtılmış makine öğrenmesini de destekler.
Spark toplu iş tanımları aracılığıyla veya etkileşimli Microsoft Fabric not defterleriyle SparkR kullanın.
R desteği yalnızca Spark3.1 veya üzerinde kullanılabilir. Spark 2.4'te R desteklenmez.
Önkoşullar
Microsoft Fabric aboneliği alın. Alternatif olarak, ücretsiz bir Microsoft Fabric deneme sürümüne kaydolun.
Synapse Veri Bilimi deneyimine geçmek için giriş sayfanızın sol tarafındaki deneyim değiştiriciyi kullanın.
Not defterini açın veya oluşturun. Nasıl yapılacağını öğrenmek için bkz . Microsoft Fabric not defterlerini kullanma.
Birincil dili değiştirmek için dil seçeneğini SparkR (R) olarak ayarlayın.
Not defterinizi bir göle ekleyin. Sol tarafta Ekle'yi seçerek mevcut bir göl evi ekleyin veya bir göl evi oluşturun.
SparkR DataFrame'leri okuma ve yazma
Yerel R data.frame'den SparkR DataFrame okuma
DataFrame oluşturmanın en basit yolu, yerel bir R data.frame'i Spark DataFrame'e dönüştürmektir.
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
Lakehouse'dan SparkR DataFrame okuma ve yazma
Veriler küme düğümlerinin yerel dosya sisteminde depolanabilir. Lakehouse'dan SparkR DataFrame okumak ve yazmak için genel yöntemler ve write.df
şeklindedirread.df
. Bu yöntemler dosyanın yüklenmesi için yolu ve veri kaynağı türünü alır. SparkR, CSV, JSON, metin ve Parquet dosyalarını yerel olarak okumayı destekler.
Bir Lakehouse'ı okumak ve yazmak için önce oturumunuza ekleyin. Not defterinin sol tarafında Ekle'yi seçerek mevcut bir Lakehouse ekleyin veya bir Lakehouse oluşturun.
Not
veya gibi read.df
Spark paketlerini kullanarak Lakehouse dosyalarına erişmek için, Spark için ADFS yolunu veya göreli yolunu kullanın.write.df
Lakehouse gezgininde, erişmek istediğiniz dosya veya klasöre sağ tıklayın ve bağlam menüsünden Spark'ın ADFS yolunu veya göreli yolunu kopyalayın.
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
Microsoft Fabric tidyverse
önceden yüklenmiş. ve readr::write_csv()
kullanarak readr::read_csv()
Lakehouse dosyalarını okuma ve yazma gibi tanıdık R paketlerinizde Lakehouse dosyalarına erişebilirsiniz.
Not
R paketlerini kullanarak Lakehouse dosyalarına erişmek için Dosya API'sinin yolunu kullanmanız gerekir. Lakehouse gezgininde, erişmek istediğiniz dosyaya veya klasöre sağ tıklayın ve bağlam menüsünden Dosya API'sinin yolunu kopyalayın.
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
SparkSQL sorgularını kullanarak Lakehouse'unuzda bir SparkR Dataframe de okuyabilirsiniz.
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
DataFrame işlemleri
SparkR DataFrames, yapılandırılmış veri işleme gerçekleştirmek için birçok işlevi destekler. Aşağıda bazı temel örnekler verilmiştir. SparkR API belgelerinde tam bir liste bulunabilir.
Satır ve sütun seçme
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
Gruplandırma ve toplama
SparkR veri çerçeveleri, gruplandırma sonrasında verileri toplamak için yaygın olarak kullanılan birçok işlevi destekler. Örneğin, aşağıda gösterildiği gibi sadık veri kümesinde bekleme süresinin histogramını hesaplayabiliriz
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
Sütun işlemleri
SparkR, veri işleme ve toplama için sütunlara doğrudan uygulanabilen birçok işlev sağlar. Aşağıdaki örnekte temel aritmetik işlevlerin kullanımı gösterilmektedir.
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
Kullanıcı tanımlı işlevi uygulama
SparkR, kullanıcı tanımlı işlevlerin çeşitli türlerini destekler:
veya ile dapply
büyük bir veri kümesinde işlev çalıştırma dapplyCollect
dapply
bir öğesinin her bölümüne bir SparkDataFrame
işlev uygulama. her bölümüne uygulanacak işlevin SparkDataFrame
tek bir parametresi olmalıdır ve data.frame her bölüme karşılık gelir. İşlevin çıkışı bir data.frame
olmalıdır. Şema, sonuçta SparkDataFrame
elde edilen öğesinin satır biçimini belirtir. Döndürülen değerin veri türleriyle eşleşmelidir.
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
dapplyCollect
Depply gibi, bir SparkDataFrame
öğesinin her bölümüne bir işlev uygulayın ve sonucu geri toplayın. işlevinin çıkışı bir data.frame
olmalıdır. Ancak bu kez şemanın geçirilmesi gerekli değildir. dapplyCollect
İşlevin çıkışları tüm bölümde çalıştırıldığında sürücüye çekilemezse ve sürücü belleğine sığmazsa başarısız olabileceğini unutmayın.
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
veya ile gapply
giriş sütunlarına göre gruplandırma yaparak büyük bir veri kümesinde işlev çalıştırma gapplyCollect
gapply
Bir öğesinin SparkDataFrame
her grubuna bir işlev uygulama. işlevi, öğesinin her grubuna SparkDataFrame
uygulanacaktır ve yalnızca iki parametre içermelidir: gruplandırma anahtarı ve bu anahtara karşılık gelen R data.frame
. Gruplar sütunlardan SparkDataFrames
seçilir. işlevinin çıkışı bir data.frame
olmalıdır. Şema, sonuçta SparkDataFrame
elde edilen öğesinin satır biçimini belirtir. Spark veri türlerinden R işlevinin çıkış şemasını temsil etmelidir. Döndürülen data.frame
sütun adları kullanıcı tarafından ayarlanır.
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
gapplyCollect
gibi gapply
, bir öğesinin SparkDataFrame
her grubuna bir işlev uygular ve sonucu R'ye data.frame
geri toplar. işlevinin çıkışı bir data.frame
olmalıdır. Ancak, şemanın geçirilmesi gerekmez. gapplyCollect
İşlevin çıkışları tüm bölümde çalıştırıldığında sürücüye çekilemezse ve sürücü belleğine sığmazsa başarısız olabileceğini unutmayın.
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
spark.lapply ile dağıtılmış yerel R işlevlerini çalıştırma
spark.lapply
Yerel R'dekine benzer şekilde lapply
, spark.lapply
bir öğe listesi üzerinde bir işlev çalıştırır ve hesaplamaları Spark ile dağıtır. Bir işlevi, bir listenin öğelerine doParallel
lapply
benzer bir şekilde uygular. Tüm hesaplamaların sonuçları tek bir makineye sığmalıdır. Böyle bir durum söz konusu değilse, gibi df <- createDataFrame(list)
bir şey yapabilir ve kullanabilirler dapply
.
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
SparkR'den SQL sorguları çalıştırma
SparkR DataFrame, verileri üzerinde SQL sorguları çalıştırmanızı sağlayan geçici bir görünüm olarak da kaydedilebilir. sql işlevi, uygulamaların SQL sorgularını program aracılığıyla çalıştırmasına olanak tanır ve sonucu SparkR DataFrame olarak döndürür.
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
Makine öğrenimi
SparkR, MLLib algoritmalarının çoğunu kullanıma sunar. SparkR, modeli eğitmek için arka planda MLlib kullanır.
Aşağıdaki örnekte SparkR kullanarak Gauss GLM modelinin nasıl derlendği gösterilmektedir. Doğrusal regresyon çalıştırmak için aileyi olarak "gaussian"
ayarlayın. Lojistik regresyon çalıştırmak için aileyi olarak "binomial"
ayarlayın. SparkML GLM
SparkR kullanırken otomatik olarak kategorik özelliklerin tek sık kodlamasını gerçekleştirir, böylece el ile yapılması gerekmez. Dize ve Çift tür özelliklerinin ötesinde, diğer MLlib bileşenleriyle uyumluluk için MLlib Vector özelliklerine de sığabilir.
Hangi makine öğrenmesi algoritmalarının desteklendiği hakkında daha fazla bilgi edinmek için SparkR ve MLlib belgelerini ziyaret edebilirsiniz.
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)