Work with delta tables in Spark
You can work with delta tables (or delta format files) to retrieve and modify data in multiple ways.
Using Spark SQL
The most common way to work with data in delta tables in Spark is to use Spark SQL. You can embed SQL statements in other languages (such as PySpark or Scala) by using the spark.sql library. For example, the following code inserts a row into the products table.
spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")
Alternatively, you can use the %%sql
magic in a notebook to run SQL statements.
%%sql
UPDATE products
SET Price = 2.49 WHERE ProductId = 1;
Use the Delta API
When you want to work with delta files rather than catalog tables, it may be simpler to use the Delta Lake API. You can create an instance of a DeltaTable from a folder location containing files in delta format, and then use the API to modify the data in the table.
from delta.tables import *
from pyspark.sql.functions import *
# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
Use time travel to work with table versioning
Modifications made to delta tables are logged in the transaction log for the table. You can use the logged transactions to view the history of changes made to the table and to retrieve older versions of the data (known as time travel)
To see the history of a table, you can use the DESCRIBE
SQL command as shown here.
%%sql
DESCRIBE HISTORY products
The results of this statement show the transactions that have been applied to the table, as shown here (some columns have been omitted):
version | timestamp | operation | operationParameters |
---|---|---|---|
2 | 2023-04-04T21:46:43Z | UPDATE | {"predicate":"(ProductId = 1)"} |
1 | 2023-04-04T21:42:48Z | WRITE | {"mode":"Append","partitionBy":"[]"} |
0 | 2023-04-04T20:04:23Z | CREATE TABLE | {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"} |
To see the history of an external table, you can specify the folder location instead of the table name.
%%sql
DESCRIBE HISTORY 'Files/mytable'
You can retrieve data from a specific version of the data by reading the delta file location into a dataframe, specifying the version required as a versionAsOf
option:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
Alternatively, you can specify a timestamp by using the timestampAsOf
option:
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)