Performing Incremental Data Loads
When your data source continuously generates new or updated records, you don’t want to reload the entire dataset each time. Instead, you can load only the changes (i.e., incremental data) and merge them into your Delta table. Delta Lake provides the powerful MERGE API to do this.
Example: Upsert New Records Using Delta Lake’s MERGE API
Suppose you have a Delta table stored at /mnt/delta/my_table and you receive a new batch of records as a DataFrame called new_data_df. You can use the following code to merge (upsert) the incremental changes:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp
# Example incremental data (new or updated rows)
new_data = [
(1, "Alice", 70000.0),
(3, "Charlie", 80000.0) # new record with id=3
]
columns = ["id", "name", "salary"]
new_data_df = spark.createDataFrame(new_data, columns)
Reference the existing Delta table
deltaTable = DeltaTable.forPath(spark, "/mnt/delta/my_table")
Perform MERGE to upsert new data
deltaTable.alias("t").merge(
new_data_df.alias("s"),
"t.id = s.id" # join condition: match records on id
).whenMatchedUpdate(
set={
"name": "s.name",
"salary": "s.salary",
"last_updated": "current_timestamp()"
}
).whenNotMatchedInsert(
values={
"id": "s.id",
"name": "s.name",
"salary": "s.salary",
"last_updated": "current_timestamp()"
}
).execute()
Explanation:
Merge Operation: The code matches incoming rows (s) to existing rows (t) based on the id column.
When Matched: If the record exists, it updates certain columns (and records the update time).
When Not Matched: If no match is found, it inserts the new record into the table.
This incremental load avoids reprocessing your entire dataset every time new data arrives, making your processes efficient—ideal for real-time or near-real-time analytics .
Subscribe to:
Post Comments (Atom)
Data synchronization in Lakehouse
Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...
-
Steps to Implement Medallion Architecture : Ingest Data into the Bronze Layer : Load raw data from external sources (e.g., databases, AP...
-
Databricks Platform Architecture The Databricks platform architecture consists of two main components: the Control Plane and the Data Pla...
-
from pyspark.sql import SparkSession from pyspark.sql.types import ArrayType, StructType from pyspark.sql.functions import col, explode_o...
No comments:
Post a Comment