This example shows how to filter and aggregate data using Spark SQL
Load data from the source table
CREATE OR REPLACE TEMP VIEW sales_data AS
SELECT * FROM delta.`/path/to/sales_data`
-- Filter data for the last year and calculate total sales per product category
CREATE OR REPLACE TABLE transformed_sales_data AS
SELECT
product_category,
SUM(sales_amount) AS total_sales
FROM
sales_data
WHERE
year = 2024
GROUP BY
product_category
Using a JOIN operation and see how the data lineage is captured
-- Load data from source tables
CREATE OR REPLACE TEMP VIEW customer_data AS
SELECT * FROM delta.`/path/to/customer_data`
CREATE OR REPLACE TEMP VIEW order_data AS
SELECT * FROM delta.`/path/to/order_data`
-- Join customer data with order data and calculate total order amount per customer
CREATE OR REPLACE TABLE customer_order_summary AS
SELECT
c.customer_id,
c.customer_name,
SUM(o.order_amount) AS total_order_amount
FROM
customer_data c
JOIN
order_data o
ON
c.customer_id = o.customer_id
GROUP BY
c.customer_id,
c.customer_name
This example shows how to filter and aggregate data using DataFrame API
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("DataLineageExample").getOrCreate()
# Load data from the source table
sales_data = spark.read.format("delta").load("/path/to/sales_data")
# Filter data for the last year
filtered_data = sales_data.filter(sales_data["year"] == 2024)
# Calculate total sales per product category
transformed_sales_data = filtered_data.groupBy("product_category").sum("sales_amount").withColumnRenamed("sum(sales_amount)", "total_sales")
# Save the transformed data to a new Delta table
transformed_sales_data.write.format("delta").mode("overwrite").saveAsTable("transformed_sales_data")
Using DataFrame API for JOIN Operation
# Load data from source tables
customer_data = spark.read.format("delta").load("/path/to/customer_data")
order_data = spark.read.format("delta").load("/path/to/order_data")
# Join customer data with order data
joined_data = customer_data.join(order_data, customer_data["customer_id"] == order_data["customer_id"])
# Calculate total order amount per customer
customer_order_summary = joined_data.groupBy("customer_id", "customer_name").sum("order_amount").withColumnRenamed("sum(order_amount)", "total_order_amount")
# Save the transformed data to a new Delta table
customer_order_summary.write.format("delta").mode("overwrite").saveAsTable("customer_order_summary")
Subscribe to:
Post Comments (Atom)
Data synchronization in Lakehouse
Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...
-
Steps to Implement Medallion Architecture : Ingest Data into the Bronze Layer : Load raw data from external sources (e.g., databases, AP...
-
from pyspark.sql import SparkSession from pyspark.sql.types import ArrayType, StructType from pyspark.sql.functions import col, explode_o...
-
Databricks Platform Architecture The Databricks platform architecture consists of two main components: the Control Plane and the Data Pla...
No comments:
Post a Comment