from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()
You can create a DataFrame from a variety of data sources, including CSV files, JSON files, and existing RDDs (Resilient Distributed Datasets).
# Read data from a CSV file
df = spark.read.csv("/path/to/file.csv", header=True, inferSchema=True)
# Read data from a JSON file
df = spark.read.json("/path/to/file.json")
# Create a DataFrame from a list of tuples
data = [("Alice", 25), ("Bob", 30), ("Catherine", 28)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
Basic DataFrame Operations
# Display the first few rows of the DataFrame
df.show()
# Print the schema of the DataFrame
df.printSchema()
# Select specific columns
df.select("Name", "Age").show()
# Filter rows based on a condition
df.filter(df["Age"] > 25).show()
# Group by a column and calculate the average age
df.groupBy("Name").avg("Age").show()
Writing DataFrames to Files
# Write DataFrame to a CSV file
df.write.csv("/path/to/output.csv", header=True)
# Write DataFrame to a JSON file
df.write.json("/path/to/output.json")
Transformations and Actions
In PySpark, there are two types of operations you can perform on a
DataFrame: transformations and actions.
Transformations are operations that create a new DataFrame from an existing one (e.g., select, filter, groupBy). They are lazy, meaning they do not immediately compute their results
Actions are operations that trigger computation and return a result to the driver program or write data to external storage (e.g., show, collect, write).
Example Workflow
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()
# Read data from a CSV file
df = spark.read.csv("/path/to/file.csv", header=True, inferSchema=True)
# Display the first few rows
df.show()
# Print the schema
df.printSchema()
# Select and filter data
filtered_df = df.select("Name", "Age").filter(df["Age"] > 25)
# Group by and aggregate
grouped_df = filtered_df.groupBy("Name").avg("Age")
# Show the result
grouped_df.show()
# Write the result to a JSON file
grouped_df.write.json("/path/to/output.json")
Subscribe to:
Post Comments (Atom)
Data synchronization in Lakehouse
Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...
-
Steps to Implement Medallion Architecture : Ingest Data into the Bronze Layer : Load raw data from external sources (e.g., databases, AP...
-
from pyspark.sql import SparkSession from pyspark.sql.types import ArrayType, StructType from pyspark.sql.functions import col, explode_o...
-
Databricks Platform Architecture The Databricks platform architecture consists of two main components: the Control Plane and the Data Pla...
No comments:
Post a Comment