Tuesday, February 25, 2025

Data Lineage Example using SQL & Python

This example shows how to filter and aggregate data using Spark SQL

Load data from the source table

CREATE OR REPLACE TEMP VIEW sales_data AS SELECT * FROM delta.`/path/to/sales_data`
-- Filter data for the last year and calculate total sales per product category
CREATE OR REPLACE TABLE transformed_sales_data AS SELECT product_category, SUM(sales_amount) AS total_sales FROM sales_data WHERE year = 2024 GROUP BY product_category

Using a JOIN operation and see how the data lineage is captured

-- Load data from source tables CREATE OR REPLACE TEMP VIEW customer_data AS SELECT * FROM delta.`/path/to/customer_data` CREATE OR REPLACE TEMP VIEW order_data AS SELECT * FROM delta.`/path/to/order_data` -- Join customer data with order data and calculate total order amount per customer CREATE OR REPLACE TABLE customer_order_summary AS SELECT c.customer_id, c.customer_name, SUM(o.order_amount) AS total_order_amount FROM customer_data c JOIN order_data o ON c.customer_id = o.customer_id GROUP BY c.customer_id, c.customer_name

This example shows how to filter and aggregate data using DataFrame API

from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("DataLineageExample").getOrCreate()
# Load data from the source table
sales_data = spark.read.format("delta").load("/path/to/sales_data")
# Filter data for the last year
filtered_data = sales_data.filter(sales_data["year"] == 2024)
# Calculate total sales per product category

transformed_sales_data = filtered_data.groupBy("product_category").sum("sales_amount").withColumnRenamed("sum(sales_amount)", "total_sales") # Save the transformed data to a new Delta table
transformed_sales_data.write.format("delta").mode("overwrite").saveAsTable("transformed_sales_data")

Using DataFrame API for JOIN Operation

# Load data from source tables
customer_data = spark.read.format("delta").load("/path/to/customer_data")
order_data = spark.read.format("delta").load("/path/to/order_data")
# Join customer data with order data
joined_data = customer_data.join(order_data, customer_data["customer_id"] == order_data["customer_id"])
# Calculate total order amount per customer
customer_order_summary = joined_data.groupBy("customer_id", "customer_name").sum("order_amount").withColumnRenamed("sum(order_amount)", "total_order_amount")
# Save the transformed data to a new Delta table
customer_order_summary.write.format("delta").mode("overwrite").saveAsTable("customer_order_summary")

No comments:

Post a Comment

Data synchronization in Lakehouse

Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...