Thursday, April 17, 2025

PySpark PartitionBy - Example

Read a CSV file and group by Year, for each year write the resulting data in the partition.

df.spark.read.format("csv").option("inferschema", True).option("header", True).option("sep", ",").load("/FileStore/tables/baby_name/input/")

display(df)

df.groupBy("Year").count().show(truncate=False)

df.write.option("header", True).partitionBy("Year").mode("overwrite").csv("/FileStore/tables/flower_name/output/")

Read a CSV file and group by Year, Color and write it by Year, for each year write the resulting data in the partition.

df.write.option("header", True).partitionBy("Year", "Color").mode("overwrite").csv("/FileStore/tables/flower_name/output/")

Read a CSV file, partition it based on the number of records, in this case 4000 records per partition, for each year write the resulting data in the partition.artition

df.write.option("header", True).option("maxRecordsPerFile", 4200).partitionBy("Year").mode("overwrite").csv("/FileStore/tables/flower_name/output/")

dbutils.fs.rm("/FileStore/tables/flower_name/output/", True)

dbutils.fs.mkdir("/FileStore/tables/flower_name/output/")

Below is an example of how you might partition a dataset of employee records by a year column using PySpark.

from pyspark.sql import SparkSession
from pyspark.sql.functions import year, to_date, col
Initialize the Spark session

spark = SparkSession.builder \
.appName("EmployeeRecordsPartitioning") \
.getOrCreate()


Read the employee records from a CSV file. Adjust the file path and options as needed.

df = spark.read.csv("employee_records.csv", header=True, inferSchema=True)

Optional: Convert the 'hire_date' column from string to date format.
(Assumes hire_date is stored in "yyyy-MM-dd" format)

df = df.withColumn("hire_date", to_date(col("hire_date"), "yyyy-MM-dd"))

Extract the year from the 'hire_date' column. If your dataset already has a year column, this step isn’t necessary.
df = df.withColumn("year", year(col("hire_date")))
Display a few rows to verify the new 'year' column.
df.show()

Write the DataFrame partitioning the records into separate directories by the 'year' value.
The resulting partitioned data is stored in Parquet format at the specified output path.
output_path = "/path/to/output/employee_partitioned"
df.write.mode("overwrite").partitionBy("year").parquet(output_path)
Stop the Spark session
spark.stop()

No comments:

Post a Comment

Data synchronization in Lakehouse

Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...