Monday, April 14, 2025

Performing Incremental Data Loads

Performing Incremental Data Loads
When your data source continuously generates new or updated records, you don’t want to reload the entire dataset each time. Instead, you can load only the changes (i.e., incremental data) and merge them into your Delta table. Delta Lake provides the powerful MERGE API to do this.

Example: Upsert New Records Using Delta Lake’s MERGE API

Suppose you have a Delta table stored at /mnt/delta/my_table and you receive a new batch of records as a DataFrame called new_data_df. You can use the following code to merge (upsert) the incremental changes:

from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp
# Example incremental data (new or updated rows)

new_data = [
(1, "Alice", 70000.0),
(3, "Charlie", 80000.0) # new record with id=3
]


columns = ["id", "name", "salary"]
new_data_df = spark.createDataFrame(new_data, columns)

Reference the existing Delta table
deltaTable = DeltaTable.forPath(spark, "/mnt/delta/my_table")

Perform MERGE to upsert new data

deltaTable.alias("t").merge(
new_data_df.alias("s"),
"t.id = s.id" # join condition: match records on id
).whenMatchedUpdate(
set={
"name": "s.name",
"salary": "s.salary",
"last_updated": "current_timestamp()"
}
).whenNotMatchedInsert(
values={
"id": "s.id",
"name": "s.name",
"salary": "s.salary",
"last_updated": "current_timestamp()"
}
).execute()

Explanation:

Merge Operation: The code matches incoming rows (s) to existing rows (t) based on the id column.
When Matched: If the record exists, it updates certain columns (and records the update time).
When Not Matched: If no match is found, it inserts the new record into the table.
This incremental load avoids reprocessing your entire dataset every time new data arrives, making your processes efficient—ideal for real-time or near-real-time analytics .

No comments:

Post a Comment

Data synchronization in Lakehouse

Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...