from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
# Initialize a Spark session
spark = SparkSession.builder \
.appName("Flatten DataFrame") \
.getOrCreate()
# Sample nested JSON data
data = [
{
"id": 1,
"name": "John",
"subjects": [{"subject": "Math", "score": 90}, {"subject": "English", "score": 85}]
},
{
"id": 2,
"name": "Jane",
"subjects": [{"subject": "Math", "score": 95}, {"subject": "Science", "score": 80}]
}
]
# Create DataFrame from sample data
df = spark.read.json(spark.sparkContext.parallelize(data))
# Flatten the DataFrame
flattened_df = df.withColumn("subject", explode(col("subjects"))).select("id", "name", col("subject.subject"), col("subject.score"))
# Show the flattened DataFrame
flattened_df.show()
# Stop the Spark session
spark.stop()
In this example:
We initialize a Spark session.
We create a DataFrame from nested JSON data.
We use the explode function to flatten the nested subjects array into individual rows.
We select the flattened columns and display the result.
----------------------------------------------------
The code snippet spark.read.json(spark.sparkContext.parallelize(data)) does the following:
Parallelizes Data: The spark.sparkContext.parallelize(data) part converts the data (which is a list of dictionaries in this case) into an RDD (Resilient Distributed Dataset), which is a fundamental data structure of Spark. This process spreads the data across multiple nodes in the Spark cluster, allowing parallel processing.
Reads JSON Data: The spark.read.json() part reads the parallelized RDD as a JSON dataset and converts it into a DataFrame. This step interprets the structure of the JSON data, defining the schema (the structure of columns) and their data types.
Combining these two parts, the code creates a DataFrame from the JSON data stored in the data variable, enabling you to perform various transformations and actions using Spark DataFrame API.
----------------------------------------------------
If you dont read the parallelized data you need to read from a data file and change the code as follows:
import json
# Write the data to a JSON file
with open('data.json', 'w') as f:
json.dump(data, f)
# Read the JSON file into a DataFrame
df = spark.read.json('data.json')
# Show the DataFrame
df.show()
Subscribe to:
Post Comments (Atom)
Data synchronization in Lakehouse
Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...
-
Steps to Implement Medallion Architecture : Ingest Data into the Bronze Layer : Load raw data from external sources (e.g., databases, AP...
-
from pyspark.sql import SparkSession from pyspark.sql.types import ArrayType, StructType from pyspark.sql.functions import col, explode_o...
-
Databricks Platform Architecture The Databricks platform architecture consists of two main components: the Control Plane and the Data Pla...
No comments:
Post a Comment