Ändern und Speichern von Dataframes
Apache Spark stellt das Dataframeobjekt als primäre Struktur für die Arbeit mit Daten bereit. Sie können Dataframes verwenden, um Daten abzufragen und zu transformieren und die Ergebnisse in einem Data Lake zu speichern. Um Daten in einen Dataframe zu laden, verwenden Sie die Funktion spark.read und geben das Dateiformat, den Pfad und optional das Schema der zu lesenden Daten an. Der folgende Code lädt beispielsweise Daten aus allen .csv-Dateien im Ordner orders in einen Dataframe mit dem Namen order_details und zeigt dann die ersten fünf Datensätze an.
order_details = spark.read.csv('/orders/*.csv', header=True, inferSchema=True)
display(order_details.limit(5))
Transformieren der Datenstruktur
Nach dem Laden der Quelldaten in einen Dataframe können Sie die Methoden und Spark-Funktionen des Dataframeobjekts verwenden, um sie zu transformieren. Zu den typischen Vorgängen für einen Dataframe gehören:
- Filtern von Zeilen und Spalten
- Umbenennen von Spalten
- Erstellen neuer Spalten, die häufig von vorhandenen Spalten abgeleitet werden
- Ersetzen von NULL- oder anderen Werten
Im folgenden Beispiel verwendet der Code die split
-Funktion, um die Werte in der Spalte CustomerName in zwei neue Spalten mit den Namen FirstName und LastName zu trennen. Anschließend wird die drop
-Methode verwendet, um die ursprüngliche CustomerName-Spalte zu löschen.
from pyspark.sql.functions import split, col
# Create the new FirstName and LastName fields
transformed_df = order_details.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))
# Remove the CustomerName field
transformed_df = transformed_df.drop("CustomerName")
display(transformed_df.limit(5))
Sie können die volle Leistungsfähigkeit der Spark-SQL-Bibliothek nutzen, um die Daten zu transformieren, indem Sie Zeilen filtern, Spalten ableiten, entfernen, umbenennen und alle anderen erforderlichen Datenänderungen vornehmen.
Speichern der transformierten Daten
Sobald Ihr DataFrame die erforderliche Struktur hat, können Sie die Ergebnisse in einem unterstützten Format in Ihrem Data Lake speichern.
Im folgenden Codebeispiel wird der DataFrame in einer Parquet-Datei im Data Lake gespeichert, wobei jede vorhandene Datei mit demselben Namen ersetzt wird.
transformed_df.write.mode("overwrite").parquet('/transformed_data/orders.parquet')
print ("Transformed data saved!")
Hinweis
Das Parquet-Format wird in der Regel für Datendateien verwendet, die Sie für die weitere Analyse oder Erfassung in einem Analysespeicher verwenden. Parquet ist ein sehr effizientes Format, das von den meisten umfangreichen Datenanalysesystemen unterstützt wird. Tatsächlich kann Ihre Datentransformation manchmal einfach darin bestehen, Daten aus einem anderen Format (z. B. CSV) in Parquet zu konvertieren!