Thursday, April 24, 2025

How to flatten a complex JSON file - Example 2

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructType
from pyspark.sql.functions import col, explode_outer

def flatten(df):

""" Recursively flattens a PySpark DataFrame with nested structures. For any column whose type is either ArrayType or StructType: - If it is a StructType, the function expands each field of the struct into a new column. The new column names are in the form "parentField_childField". - If it is an ArrayType, the function uses `explode_outer` to convert each element of the array into a separate row (which is useful for arrays of structs or primitive types).

Parameters: df (DataFrame): The input DataFrame that may contain nested columns.

Returns: DataFrame: A flattened DataFrame with no nested columns. """

# Identify columns whose type is either ArrayType or StructType.

complex_fields = {field.name: field.dataType for field in df.schema.fields if isinstance(field.dataType, (ArrayType, StructType))}

while complex_fields:
for col_name, col_type in complex_fields.items():
if isinstance(col_type, StructType):
# For a struct: expand its fields as separate columns.
expanded = [ col(col_name + '.' + subfield.name).alias(col_name + '_' + subfield.name) for subfield in col_type.fields ]

df = df.select("*", *expanded).drop(col_name)
elif isinstance(col_type, ArrayType):
# For an array, explode it so that each element becomes a separate row.
df = df.withColumn(col_name, explode_outer(col_name))
# Recompute the schema to check whether more nested columns remain.

complex_fields = {field.name: field.dataType for field in df.schema.fields if isinstance(field.dataType, (ArrayType, StructType))}

return df
Example Usage
if __name__ == "__main__":
spark = SparkSession.builder.appName("FlattenNestedJson").getOrCreate()

# Replace this with the path to your nested employee JSON file.
json_file_path = "/path/to/employee_record.json"

# Read the nested JSON file.
df = spark.read.json(json_file_path)
# Apply the flatten function.
flat_df = flatten(df)
# Display the flattened DataFrame.
flat_df.show(truncate=False)
spark.stop()



Detecting Complex Types: The function first builds a dictionary (complex_fields) of column names pointing to their data types for any field that is a nested structure (either an array or a struct).

Processing Structs: For each field of type StructType, the code iterates over its subfields and creates new columns named in the pattern "parent_subfield". The original nested column is then dropped.

Processing Arrays: For columns with ArrayType, the function calls explode_outer which converts each element of the array into a separate row (ensuring that null or empty arrays are handled gracefully).

Iterative Flattening: After processing the current set of nested fields, the function rebuilds the dictionary to catch any newly exposed nested fields. This loop continues until no more complex types remain.

How to flatten a complex JSON file - Example 1

from pyspark.sql.types import *
from pyspark.sql.functions import *

def flatten(df):
compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])

while len(complex_fields)!=0:

col_name=list(complex_fields.keys())[0]
print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))

if StructType then convert all sub element to columns.
i.e. flatten structs
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]

df=df.select("*", *expanded).drop(col_name)

if ArrayType then add the Array Elements as Rows using the explode function

i.e. explode Arrays
elif (type(complex_fields[col_name]) == ArrayType):

df=df.withColumn(col_name,explode_outer(col_name))

recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return df


Sample Nested Employee Data

[
{
"employeeId": "E001",
"name": {
"first": "John",
"last": "Doe"
},
"contact": {
"email": "john.doe@example.com",
"phones": [
"555-1234",
"555-5678"
]
},
"address": {
"street": "123 Elm St",
"city": "Springfield",
"state": "IL",
"zipcode": "62704"
},
"department": {
"deptId": "D001",
"deptName": "Engineering"
},
"projects": [
{ "projectId": "P001",
"projectName": "Redesign Website", "duration": "3 months" },
{
"projectId": "P002",
"projectName": "Develop Mobile App",
"duration": "6 months"
}
]
},
{
"employeeId": "E002",
"name": {
"first": "Jane",
"last": "Smith"
},
"contact": {
"email": "jane.smith@example.com",
"phones": [
"555-9876"
]
},
"address": {
"street": "456 Oak St",
"city": "Riverside",
"state": "CA",
"zipcode": "92501"
},
"department": {
"deptId": "D002",
"deptName": "Marketing"
},
"projects": [
{
"projectId": "P003",
"projectName": "Product Launch",
"duration": "2 months"
}
]
}
]

Thursday, April 17, 2025

PySpark PartitionBy - Example

Read a CSV file and group by Year, for each year write the resulting data in the partition.

df.spark.read.format("csv").option("inferschema", True).option("header", True).option("sep", ",").load("/FileStore/tables/baby_name/input/")

display(df)

df.groupBy("Year").count().show(truncate=False)

df.write.option("header", True).partitionBy("Year").mode("overwrite").csv("/FileStore/tables/flower_name/output/")

Read a CSV file and group by Year, Color and write it by Year, for each year write the resulting data in the partition.

df.write.option("header", True).partitionBy("Year", "Color").mode("overwrite").csv("/FileStore/tables/flower_name/output/")

Read a CSV file, partition it based on the number of records, in this case 4000 records per partition, for each year write the resulting data in the partition.artition

df.write.option("header", True).option("maxRecordsPerFile", 4200).partitionBy("Year").mode("overwrite").csv("/FileStore/tables/flower_name/output/")

dbutils.fs.rm("/FileStore/tables/flower_name/output/", True)

dbutils.fs.mkdir("/FileStore/tables/flower_name/output/")

Below is an example of how you might partition a dataset of employee records by a year column using PySpark.

from pyspark.sql import SparkSession
from pyspark.sql.functions import year, to_date, col
Initialize the Spark session

spark = SparkSession.builder \
.appName("EmployeeRecordsPartitioning") \
.getOrCreate()


Read the employee records from a CSV file. Adjust the file path and options as needed.

df = spark.read.csv("employee_records.csv", header=True, inferSchema=True)

Optional: Convert the 'hire_date' column from string to date format.
(Assumes hire_date is stored in "yyyy-MM-dd" format)

df = df.withColumn("hire_date", to_date(col("hire_date"), "yyyy-MM-dd"))

Extract the year from the 'hire_date' column. If your dataset already has a year column, this step isn’t necessary.
df = df.withColumn("year", year(col("hire_date")))
Display a few rows to verify the new 'year' column.
df.show()

Write the DataFrame partitioning the records into separate directories by the 'year' value.
The resulting partitioned data is stored in Parquet format at the specified output path.
output_path = "/path/to/output/employee_partitioned"
df.write.mode("overwrite").partitionBy("year").parquet(output_path)
Stop the Spark session
spark.stop()

Tuesday, April 15, 2025

Databricks EXPLAIN Plan

The Databricks EXPLAIN plan is a built‐in tool that lets you peek under the hood of your Spark SQL queries or DataFrame operations. Its main purpose is to show exactly how your high-level statement is translated, optimized, and executed across your cluster. Here’s a streamlined summary:

Multiple Layers of Query Representation:

Parsed Logical Plan: This is where Spark first interprets your query's syntax without yet resolving table or column names.

Analyzed Logical Plan: In this stage, Spark resolves these names and data types, transforming your raw query into one that reflects the structure of your data.

Optimized Logical Plan: Spark then applies various optimization rules such as predicate pushdown, projection pruning, and join reordering—essentially refining the query for efficiency without changing its result.

Physical Plan: Finally, the engine decides on specific execution strategies (like scans, joins, and shuffles) and constructs a plan that details how your operations will run on the cluster.

Modes of EXPLAIN:
Simple Mode (default): Shows only the final physical plan.
Extended Mode: Provides all stages—from the parsed plan through to the physical plan.
Formatted Mode: Organizes the output into a neat overview (physical plan outline) and detailed node information.
Cost and Codegen Modes: Offer additional insights such as cost statistics (when available) or even generated code for physical operations.

Why It’s Valuable:

Debugging and Performance Tuning: By examining each layer, you can identify expensive operations (e.g., data shuffles) or inefficient join strategies, which is crucial for optimizing performance and debugging complex queries.
Understanding Spark’s Optimizations: It offers transparency into how Catalyst (Spark’s optimizer) works, helping you appreciate the transition from high-level code to the low-level execution tasks actually run on your hardware.
In essence, the Databricks EXPLAIN plan is like having a roadmap of how your data moves and transforms from the moment you write your query to the time results are delivered. This detail is invaluable for both debugging query issues and refining performance, especially as your datasets and transformations grow more complex.

Monday, April 14, 2025

Implementing Time Travel

Implementing Time Travel

One of Delta Lake’s standout features is time travel. Thanks to its transaction log, Delta Lake stores the entire change history of a table. This makes it possible to query older snapshots (or versions) of your data. Time travel is useful for auditing, debugging, and even reproducing models from historical data.

Example 1: Query by Version Number
# Read a previous version of the Delta table

df_previous = spark.read.format("delta") \
.option("versionAsOf", 3) \
.load("/mnt/delta/my_table")
df_previous.show()


Query by timestamp

# Read the table state as of a specific timestamp
df_as_of = spark.read.format("delta") \
.option("timestampAsOf", "2025-04-01 00:00:00") \
.load("/mnt/delta/my_table")
df_as_of.show()

# Read the table state as of a specific timestamp

df_as_of = spark.read.format("delta") \
.option("timestampAsOf", "2025-04-01 00:00:00") \
.load("/mnt/delta/my_table")
df_as_of.show()

Explanation:

Version-based Time Travel:The versionAsOf parameter allows you to specify the exact version of the table you wish to query.
Timestamp-based Time Travel: Alternatively, using timestampAsOf you can retrieve the table state as it existed at a particular time.
You can also use SQL to view the table’s history:

DESCRIBE HISTORY my_table:

This command lets you see all the changes (inserts, updates, deletes) that have been applied over time.
Time travel can be incredibly powerful for investigating issues or rolling back accidental changes, ensuring a higher degree of data reliability and auditability.
Wrapping Up
Delta Lake’s capabilities—incremental upsert via the MERGE API, file optimization through Z‑Ordering, and historical querying using time travel—enable you to build robust, high-performance data pipelines. They allow you to process only new or changed data, optimize query performance by reorganizing on-disk data, and easily access snapshots of your data from the past.

Optimizing Table Performance with Z‑Ordering

Optimizing Table Performance with Z‑Ordering

Over time, frequent incremental loads (plus file-level operations like compaction) can result in many small files. Queries filtering on certain columns might have to scan many files, which can slow performance. Z‑Ordering is a technique that reorganizes data on disk based on one or more columns. When your table is physically organized by those columns, queries that filter on them can skip reading irrelevant files.

Example: Optimize and Z‑Order a Delta Table

Once your Delta table has been updated with incremental loads, you can run the following SQL command to improve query performance: # Optimize the table and perform Z‑Ordering on the 'id' column spark.sql("OPTIMIZE my_table ZORDER BY (id)")

Explanation:

OPTIMIZE Command: This command compacts small files into larger ones.
ZORDER BY: By ordering the data using the specified column (id in this case), Delta Lake clusters similar values together. This reduction in file-level fragmentation means that queries filtering on id will scan fewer files—cutting down the overall I/O and speeding up query execution .
Tip: You can Z‑Order on multiple columns if your query filters often include more than one attribute (e.g., ZORDER BY (country, city)).

Performing Incremental Data Loads

Performing Incremental Data Loads
When your data source continuously generates new or updated records, you don’t want to reload the entire dataset each time. Instead, you can load only the changes (i.e., incremental data) and merge them into your Delta table. Delta Lake provides the powerful MERGE API to do this.

Example: Upsert New Records Using Delta Lake’s MERGE API

Suppose you have a Delta table stored at /mnt/delta/my_table and you receive a new batch of records as a DataFrame called new_data_df. You can use the following code to merge (upsert) the incremental changes:

from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp
# Example incremental data (new or updated rows)

new_data = [
(1, "Alice", 70000.0),
(3, "Charlie", 80000.0) # new record with id=3
]


columns = ["id", "name", "salary"]
new_data_df = spark.createDataFrame(new_data, columns)

Reference the existing Delta table
deltaTable = DeltaTable.forPath(spark, "/mnt/delta/my_table")

Perform MERGE to upsert new data

deltaTable.alias("t").merge(
new_data_df.alias("s"),
"t.id = s.id" # join condition: match records on id
).whenMatchedUpdate(
set={
"name": "s.name",
"salary": "s.salary",
"last_updated": "current_timestamp()"
}
).whenNotMatchedInsert(
values={
"id": "s.id",
"name": "s.name",
"salary": "s.salary",
"last_updated": "current_timestamp()"
}
).execute()

Explanation:

Merge Operation: The code matches incoming rows (s) to existing rows (t) based on the id column.
When Matched: If the record exists, it updates certain columns (and records the update time).
When Not Matched: If no match is found, it inserts the new record into the table.
This incremental load avoids reprocessing your entire dataset every time new data arrives, making your processes efficient—ideal for real-time or near-real-time analytics .

Data synchronization in Lakehouse

Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...