Tuesday, February 25, 2025

DataFrame Basics

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()

You can create a DataFrame from a variety of data sources, including CSV files, JSON files, and existing RDDs (Resilient Distributed Datasets).

# Read data from a CSV file
df = spark.read.csv("/path/to/file.csv", header=True, inferSchema=True)

# Read data from a JSON file
df = spark.read.json("/path/to/file.json")

# Create a DataFrame from a list of tuples data = [("Alice", 25), ("Bob", 30), ("Catherine", 28)] columns = ["Name", "Age"] df = spark.createDataFrame(data, columns)

Basic DataFrame Operations

# Display the first few rows of the DataFrame
df.show()
# Print the schema of the DataFrame
df.printSchema()
# Select specific columns
df.select("Name", "Age").show()
# Filter rows based on a condition
df.filter(df["Age"] > 25).show()
# Group by a column and calculate the average age df.groupBy("Name").avg("Age").show()

Writing DataFrames to Files

# Write DataFrame to a CSV file
df.write.csv("/path/to/output.csv", header=True)

# Write DataFrame to a JSON file
df.write.json("/path/to/output.json")

Transformations and Actions

In PySpark, there are two types of operations you can perform on a DataFrame: transformations and actions.
Transformations are operations that create a new DataFrame from an existing one (e.g., select, filter, groupBy). They are lazy, meaning they do not immediately compute their results
Actions are operations that trigger computation and return a result to the driver program or write data to external storage (e.g., show, collect, write).

Example Workflow

from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()
# Read data from a CSV file
df = spark.read.csv("/path/to/file.csv", header=True, inferSchema=True)
# Display the first few rows
df.show()
# Print the schema
df.printSchema()
# Select and filter data
filtered_df = df.select("Name", "Age").filter(df["Age"] > 25)
# Group by and aggregate
grouped_df = filtered_df.groupBy("Name").avg("Age")
# Show the result
grouped_df.show()
# Write the result to a JSON file
grouped_df.write.json("/path/to/output.json")

No comments:

Post a Comment

Data synchronization in Lakehouse

Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...