from pyspark.sql.types import *
from pyspark.sql.functions import *
def flatten(df):
compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0:
col_name=list(complex_fields.keys())[0]
print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
if StructType then convert all sub element to columns.
i.e. flatten structs
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]
df=df.select("*", *expanded).drop(col_name)
if ArrayType then add the Array Elements as Rows using the explode function
i.e. explode Arrays
elif (type(complex_fields[col_name]) == ArrayType):
df=df.withColumn(col_name,explode_outer(col_name))
recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return df
Sample Nested Employee Data
[
{
"employeeId": "E001",
"name": {
"first": "John",
"last": "Doe"
},
"contact": {
"email": "john.doe@example.com",
"phones": [
"555-1234",
"555-5678"
]
},
"address": {
"street": "123 Elm St",
"city": "Springfield",
"state": "IL",
"zipcode": "62704"
},
"department": {
"deptId": "D001",
"deptName": "Engineering"
},
"projects": [
{
"projectId": "P001",
"projectName": "Redesign Website",
"duration": "3 months"
},
{
"projectId": "P002",
"projectName": "Develop Mobile App",
"duration": "6 months"
}
]
},
{
"employeeId": "E002",
"name": {
"first": "Jane",
"last": "Smith"
},
"contact": {
"email": "jane.smith@example.com",
"phones": [
"555-9876"
]
},
"address": {
"street": "456 Oak St",
"city": "Riverside",
"state": "CA",
"zipcode": "92501"
},
"department": {
"deptId": "D002",
"deptName": "Marketing"
},
"projects": [
{
"projectId": "P003",
"projectName": "Product Launch",
"duration": "2 months"
}
]
}
]
Subscribe to:
Post Comments (Atom)
Data synchronization in Lakehouse
Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...
-
Steps to Implement Medallion Architecture : Ingest Data into the Bronze Layer : Load raw data from external sources (e.g., databases, AP...
-
Databricks Platform Architecture The Databricks platform architecture consists of two main components: the Control Plane and the Data Pla...
-
from pyspark.sql import SparkSession from pyspark.sql.types import ArrayType, StructType from pyspark.sql.functions import col, explode_o...
No comments:
Post a Comment