Trabajar con datos en un objeto DataFrame de Spark

Completado

De forma nativa, Spark usa una estructura de datos denominada conjunto de datos distribuido resistente (RDD); pero aunque puede escribir código que funcione directamente con RDD, la estructura de datos más usada para trabajar con datos estructurados en Spark es el elemento dataframe, que se proporciona como parte de la biblioteca Spark SQL. Los elementos dataframe de Spark son similares a los de la biblioteca Pandas de Python, pero están optimizados para funcionar en el entorno de procesamiento distribuido de Spark.

Nota

Además de Dataframe API, Spark SQL proporciona una API Dataset fuertemente tipada que se admite en Java y Scala. En este módulo se centrará en Dataframe API.

Carga de datos en un elemento dataframe

Ahora se explorará un ejemplo hipotético para ver cómo puede usar un elemento dataframe para trabajar con datos. Supongamos que tiene los siguientes datos en un archivo de texto delimitado por comas denominado products.csv en la carpeta Files/data del almacén de lago:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Inferencia de un esquema

En un cuaderno de Spark, puede usar el código PySpark siguiente para cargar los datos del archivo en un objeto DataFrame y mostrar las primeras 10 filas:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

La línea %%pyspark al principio se denomina comando magic e indica a Spark que el lenguaje usado en esta celda es PySpark. Puede seleccionar el lenguaje que quiera usar como predeterminado en la barra de herramientas de la interfaz de Notebook y, después, usar un comando magic a fin de invalidar esa opción para una celda específica. Por ejemplo, este es el código de Scala equivalente para el ejemplo de datos de productos:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

El comando magic %%spark se usa para especificar Scala.

Los dos ejemplos de código generarían una salida similar a la siguiente:

ProductID ProductName Category ListPrice
771 Mountain-100 Silver, 38 Bicicletas de montaña 3399.9900
772 Mountain-100 Silver, 42 Bicicletas de montaña 3399.9900
773 Mountain-100 Silver, 44 Bicicletas de montaña 3399.9900
... ... ... ...

Especificación de un esquema explícito

En el ejemplo anterior, la primera fila del archivo CSV contenía los nombres de columna y Spark ha podido deducir el tipo de datos de cada columna a partir de los datos que contiene. También puede especificar un esquema explícito para los datos, lo que resulta útil cuando los nombres de columna no se incluyen en el archivo de datos, como en este archivo CSV de ejemplo:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

En el siguiente ejemplo de PySpark se muestra cómo especificar un esquema para el elemento dataframe que se va a cargar desde un archivo denominado product-data.csv en este formato:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

De nuevo, los resultados serán similares a los siguientes:

ProductID ProductName Category ListPrice
771 Mountain-100 Silver, 38 Bicicletas de montaña 3399.9900
772 Mountain-100 Silver, 42 Bicicletas de montaña 3399.9900
773 Mountain-100 Silver, 44 Bicicletas de montaña 3399.9900
... ... ... ...

Sugerencia

La especificación de un esquema explícito también mejora el rendimiento.

Filtrado y agrupación de elementos dataframe

Puede usar los métodos de la clase Dataframe para filtrar, ordenar, agrupar y manipular los datos que contiene. Por ejemplo, en el ejemplo de código siguiente se usa el método select para recuperar las columnas ProductID y ListPrice del elemento dataframe df que contiene datos de producto del ejemplo anterior:

pricelist_df = df.select("ProductID", "ListPrice")

Los resultados de este ejemplo de código tendrían un aspecto similar al siguiente:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Como sucede con la mayoría de los métodos de manipulación de datos, select devuelve un nuevo objeto dataframe.

Sugerencia

La selección de un subconjunto de columnas de un elemento dataframe es una operación común, que también se puede lograr mediante la siguiente sintaxis más corta:

pricelist_df = df["ProductID", "ListPrice"]

Puede "encadenar" métodos para realizar una serie de manipulaciones que generen como resultado un elemento dataframe transformado. Por ejemplo, en este código de ejemplo se encadenan los métodos select y where para crear un elemento dataframe que contenga las columnas ProductName y ListPrice para productos con una categoría de Mountain Bikes o Road Bikes:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Los resultados de este ejemplo de código tendrían un aspecto similar al siguiente:

ProductName Category ListPrice
Mountain-100 Silver, 38 Bicicletas de montaña 3399.9900
Road-750 Black, 52 Bicicletas de carretera 539.9900
... ... ...

Para agrupar y agregar datos, puede usar el método groupBy y las funciones de agregado. Por ejemplo, en el código de PySpark siguiente se cuenta el número de productos para cada categoría:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Los resultados de este ejemplo de código tendrían un aspecto similar al siguiente:

Category count
Tubos de dirección 3
Ruedas 14
Bicicletas de montaña 32
... ...

Guardado de un objeto DataFrame

A menudo, querrá usar Spark para transformar los datos sin procesar y guardar los resultados para un análisis más exhaustivo o su procesamiento de bajada. En el ejemplo de código siguiente se guarda el objeto DataFrame en un archivo Parquet del lago de datos, reemplazando cualquier archivo existente con el mismo nombre.

bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')

Nota

Normalmente, el formato Parquet es el preferido para los archivos de datos que usará para su posterior análisis o ingesta en un almacén analítico. Parquet es un formato muy eficaz que es compatible con la mayoría de los sistemas de análisis de datos a gran escala. De hecho, a veces el requisito de transformación de datos puede ser simplemente convertir datos de otro formato (como CSV) a Parquet.

Creación de particiones del archivo de salida

La creación de particiones es una técnica de optimización que permite a Spark maximizar el rendimiento en los nodos de trabajo. Se pueden lograr más mejoras de rendimiento al filtrar datos en consultas mediante la eliminación de E/S de disco innecesarias.

Para guardar un objeto DataFrame como un conjunto de archivos con particiones, use el método partitionBy al escribir los datos. En el ejemplo siguiente, se guarda el objeto DataFrame bikes_df (que contiene los datos del producto para las categorías Mountain Bikes y Road Bikes) y se crean particiones de los datos por categoría:

bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")

Los nombres de carpeta generados al crear particiones de un objeto DataFrame incluyen el nombre y el valor de la columna de partición en formato columna=valor, por lo que el ejemplo de código crea una carpeta llamada bike_data que contiene las subcarpetas siguientes:

  • Category=Mountain Bikes
  • Category=Road Bikes

Cada subcarpeta contiene uno o varios archivos parquet con los datos del producto para la categoría adecuada.

Nota

Puede crear particiones de los datos mediante varias columnas, lo que da como resultado una jerarquía de carpetas para cada clave de partición. Por ejemplo, puede crear particiones de los datos de pedidos de ventas por año y mes, de modo que la jerarquía de carpetas incluya una carpeta para cada valor de año que, a su vez, contenga una subcarpeta para cada valor de mes.

Carga de datos con particiones

Al leer datos con particiones en un objeto DataFrame, puede cargar datos de cualquier carpeta en la jerarquía mediante la especificación de valores explícitos o caracteres comodín para los campos con particiones. En el ejemplo siguiente se cargan datos de los productos en la categoría Road Bikes:

road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))

Nota

Las columnas de partición especificadas en la ruta de acceso del archivo se omiten en el objeto DataFrame resultante. Los resultados generados por la consulta de ejemplo no incluyen una columna Category, ya que la categoría de todas las filas sería Road Bikes.