Tuesday, February 25, 2025

Flatten a nested DataFrame

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode

# Initialize a Spark session
spark = SparkSession.builder \ .appName("Flatten DataFrame") \ .getOrCreate()

# Sample nested JSON data

data = [
{
"id": 1,
"name": "John",
"subjects": [{"subject": "Math", "score": 90}, {"subject": "English", "score": 85}]
},
{
"id": 2,
"name": "Jane",
"subjects": [{"subject": "Math", "score": 95}, {"subject": "Science", "score": 80}]
}
]


# Create DataFrame from sample data

df = spark.read.json(spark.sparkContext.parallelize(data))

# Flatten the DataFrame

flattened_df = df.withColumn("subject", explode(col("subjects"))).select("id", "name", col("subject.subject"), col("subject.score"))
# Show the flattened DataFrame
flattened_df.show()
# Stop the Spark session
spark.stop()

In this example:

We initialize a Spark session.
We create a DataFrame from nested JSON data.
We use the explode function to flatten the nested subjects array into individual rows.
We select the flattened columns and display the result.

----------------------------------------------------

The code snippet spark.read.json(spark.sparkContext.parallelize(data)) does the following:

Parallelizes Data: The spark.sparkContext.parallelize(data) part converts the data (which is a list of dictionaries in this case) into an RDD (Resilient Distributed Dataset), which is a fundamental data structure of Spark. This process spreads the data across multiple nodes in the Spark cluster, allowing parallel processing.

Reads JSON Data: The spark.read.json() part reads the parallelized RDD as a JSON dataset and converts it into a DataFrame. This step interprets the structure of the JSON data, defining the schema (the structure of columns) and their data types.

Combining these two parts, the code creates a DataFrame from the JSON data stored in the data variable, enabling you to perform various transformations and actions using Spark DataFrame API.


----------------------------------------------------

If you dont read the parallelized data you need to read from a data file and change the code as follows:

import json
# Write the data to a JSON file
with open('data.json', 'w') as f: json.dump(data, f)
# Read the JSON file into a DataFrame
df = spark.read.json('data.json')
# Show the DataFrame
df.show()

No comments:

Post a Comment

Data synchronization in Lakehouse

Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...