Read a CSV file and group by Year, for each year write the resulting data in the partition.
df.spark.read.format("csv").option("inferschema", True).option("header", True).option("sep", ",").load("/FileStore/tables/baby_name/input/")
display(df)
df.groupBy("Year").count().show(truncate=False)
df.write.option("header", True).partitionBy("Year").mode("overwrite").csv("/FileStore/tables/flower_name/output/")
Read a CSV file and group by Year, Color and write it by Year, for each year write the resulting data in the partition.
df.write.option("header", True).partitionBy("Year", "Color").mode("overwrite").csv("/FileStore/tables/flower_name/output/")
Read a CSV file, partition it based on the number of records, in this case 4000 records per partition, for each year write the resulting data in the partition.artition
df.write.option("header", True).option("maxRecordsPerFile", 4200).partitionBy("Year").mode("overwrite").csv("/FileStore/tables/flower_name/output/")
dbutils.fs.rm("/FileStore/tables/flower_name/output/", True)
dbutils.fs.mkdir("/FileStore/tables/flower_name/output/")
Below is an example of how you might partition a dataset of employee records by a year column using PySpark.
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, to_date, col
Initialize the Spark session
spark = SparkSession.builder \
.appName("EmployeeRecordsPartitioning") \
.getOrCreate()
Read the employee records from a CSV file. Adjust the file path and options as needed.
df = spark.read.csv("employee_records.csv", header=True, inferSchema=True)
Optional: Convert the 'hire_date' column from string to date format.
(Assumes hire_date is stored in "yyyy-MM-dd" format)
df = df.withColumn("hire_date", to_date(col("hire_date"), "yyyy-MM-dd"))
Extract the year from the 'hire_date' column. If your dataset already has a year column, this step isn’t necessary.
df = df.withColumn("year", year(col("hire_date")))
Display a few rows to verify the new 'year' column.
df.show()
Write the DataFrame partitioning the records into separate directories by the 'year' value.
The resulting partitioned data is stored in Parquet format at the specified output path.
output_path = "/path/to/output/employee_partitioned"
df.write.mode("overwrite").partitionBy("year").parquet(output_path)
Stop the Spark session
spark.stop()
Subscribe to:
Post Comments (Atom)
Data synchronization in Lakehouse
Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...
-
Steps to Implement Medallion Architecture : Ingest Data into the Bronze Layer : Load raw data from external sources (e.g., databases, AP...
-
Databricks Platform Architecture The Databricks platform architecture consists of two main components: the Control Plane and the Data Pla...
-
from pyspark.sql import SparkSession from pyspark.sql.types import ArrayType, StructType from pyspark.sql.functions import col, explode_o...
No comments:
Post a Comment