Transformieren von Daten mit SQL
Mit der SparkSQL-Bibliothek, die die Dataframestruktur bereitstellt, können Sie auch SQL für die Arbeit mit Daten verwenden. Mit diesem Ansatz können Sie Daten in Dataframes mithilfe von SQL-Abfragen abfragen und transformieren und die Ergebnisse beständig als Tabellen speichern.
Hinweis
Tabellen sind Abstraktionen von Metadaten über Dateien. Die Daten werden nicht in einer relationalen Tabelle gespeichert, aber die Tabelle bietet eine relationale Ebene über den Dateien im Data Lake.
Definieren von Tabellen und Ansichten
Tabellendefinitionen in Spark werden im Metastore gespeichert, einer Metadatenebene, die relationale Abstraktionen über Dateien kapselt. Externe Tabellen sind relationale Tabellen im Metastore, die auf Dateien an einem von Ihnen angegebenen Data Lake-Speicherort verweisen. Sie können auf diese Daten zugreifen, indem Sie die Tabelle abfragen oder die Dateien direkt aus dem Data Lake lesen.
Hinweis
Externe Tabellen sind „lose an die zugrunde liegenden Dateien gebunden“, und durch das Löschen der Tabelle werden die Dateien nicht gelöscht. Auf diese Weise können Sie Spark verwenden, um die transformationsintensiven Aufgaben durchzuführen und dann die Daten im Lake zu speichern. Danach können Sie die Tabelle löschen, und nachgeschaltete Prozesse können auf diese optimierten Strukturen zugreifen. Sie können auch verwaltete Tabellen definieren, für die die zugrunde liegenden Datendateien in einem intern verwalteten Speicherort gespeichert werden, der dem Metastore zugeordnet ist. Verwaltete Tabellen sind „eng an die Dateien gebunden“, und durch das Löschen einer Tabelle werden die zugehörigen Dateien gelöscht.
Im folgenden Codebeispiel wird ein Dataframe (aus CSV-Dateien geladen) als externer Tabellenname sales_orders gespeichert. Die Dateien werden im Ordner /sales_orders_table im Data Lake gespeichert.
order_details.write.saveAsTable('sales_orders', format='parquet', mode='overwrite', path='/sales_orders_table')
Verwenden von SQL zum Abfragen und Transformieren der Daten
Nach dem Definieren einer Tabelle können Sie SQL nutzen, um die darin enthaltenen Daten abzufragen und zu transformieren. Der folgende Code erstellt zwei neue abgeleitete Spalten namens Year (Jahr) und Month (Month) und erstellt dann die neue Tabelle transformed_orders mit den neu hinzugefügten abgeleiteten Spalten.
# Create derived columns
sql_transform = spark.sql("SELECT *, YEAR(OrderDate) AS Year, MONTH(OrderDate) AS Month FROM sales_orders")
# Save the results
sql_transform.write.partitionBy("Year","Month").saveAsTable('transformed_orders', format='parquet', mode='overwrite', path='/transformed_orders_table')
Die Datendateien für die neue Tabelle werden in einer Hierarchie von Ordnern im Format Year=*NNNN* / Month=*N* gespeichert, wobei jeder Ordner eine Parquet-Datei für die entsprechenden Bestellungen nach Jahr und Monat enthält.
Abfragen des Metastores
Da diese neue Tabelle im Metastore erstellt wurde, können Sie SQL verwenden, um sie direkt mit dem Magic-Befehl %%sql in der ersten Zeile abzufragen, um anzugeben, dass die SQL-Syntax wie im folgenden Skript verwendet werden soll:
%%sql
SELECT * FROM transformed_orders
WHERE Year = 2021
AND Month = 1
Löschen von Tabellen
Wenn Sie mit externen Tabellen arbeiten, können Sie den DROP
-Befehl verwenden, um die Tabellendefinitionen aus dem Metastore zu löschen, ohne dass sich dies auf die Dateien im Data Lake auswirkt. Mit diesem Ansatz können Sie den Metastore bereinigen, nachdem Sie SQL zum Transformieren der Daten verwendet haben, während die transformierten Datendateien für nachgelagerte Datenanalyse- und Erfassungsprozesse verfügbar gemacht werden.
%%sql
DROP TABLE transformed_orders;
DROP TABLE sales_orders;