Thursday, April 24, 2025

How to flatten a complex JSON file - Example 1

from pyspark.sql.types import *
from pyspark.sql.functions import *

def flatten(df):
compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])

while len(complex_fields)!=0:

col_name=list(complex_fields.keys())[0]
print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))

if StructType then convert all sub element to columns.
i.e. flatten structs
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]

df=df.select("*", *expanded).drop(col_name)

if ArrayType then add the Array Elements as Rows using the explode function

i.e. explode Arrays
elif (type(complex_fields[col_name]) == ArrayType):

df=df.withColumn(col_name,explode_outer(col_name))

recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return df


Sample Nested Employee Data

[
{
"employeeId": "E001",
"name": {
"first": "John",
"last": "Doe"
},
"contact": {
"email": "john.doe@example.com",
"phones": [
"555-1234",
"555-5678"
]
},
"address": {
"street": "123 Elm St",
"city": "Springfield",
"state": "IL",
"zipcode": "62704"
},
"department": {
"deptId": "D001",
"deptName": "Engineering"
},
"projects": [
{ "projectId": "P001",
"projectName": "Redesign Website", "duration": "3 months" },
{
"projectId": "P002",
"projectName": "Develop Mobile App",
"duration": "6 months"
}
]
},
{
"employeeId": "E002",
"name": {
"first": "Jane",
"last": "Smith"
},
"contact": {
"email": "jane.smith@example.com",
"phones": [
"555-9876"
]
},
"address": {
"street": "456 Oak St",
"city": "Riverside",
"state": "CA",
"zipcode": "92501"
},
"department": {
"deptId": "D002",
"deptName": "Marketing"
},
"projects": [
{
"projectId": "P003",
"projectName": "Product Launch",
"duration": "2 months"
}
]
}
]

No comments:

Post a Comment

Data synchronization in Lakehouse

Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...