Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres database without requiring complex ETL pipelines.
How It Works
Sync from Delta Lake: Lakebase allows automatic synchronization from Delta tables to Postgres tables, ensuring that data updates are reflected in real-time.
Managed Sync:
Instead of manually moving data, Lakebase provides a fully managed synchronization process that continuously updates records.
Optional Secondary Indexes: Users can define indexes to optimize query performance on synchronized data.
Change Data Capture (CDC): Lakebase supports CDC, meaning it tracks inserts, updates, and deletes to maintain consistency.
Multi-Cloud Support: Synchronization works across different cloud environments, ensuring flexibility and scalability.
Key Benefits
Eliminates ETL Complexity: No need for custom pipelines—data flows seamlessly.
Real-Time Updates: Ensures low-latency access to fresh data.
Optimized for AI & ML: Supports feature serving and retrieval-augmented generation (RAG).
Secure & Governed: Works with Unity Catalog for authentication and data governance.
While data synchronization and data replication are often used interchangeably, they have distinct differences:
Data Synchronization
Ensures that two or more copies of data remain consistent and up-to-date.
Can involve incremental updates, meaning only changed data is transferred.
Often used in distributed systems where data needs to be continuously updated across multiple locations.
Example: Keeping a mobile app's local database in sync with a central cloud database.
Data Replication
Creates exact copies of data across multiple locations.
Typically involves bulk transfers, meaning entire datasets are copied.
Used for backup, disaster recovery, and load balancing.
Example:A read replica of a database used to distribute query load.
Key Differences
Synchronization focuses on keeping data updated across systems, while replication ensures identical copies exist.
Synchronization can be real-time or scheduled, whereas replication is often one-time or periodic.
Synchronization is more dynamic, while replication is more static.
Thursday, June 12, 2025
What is Lakebase
Lakebase is a new serverless Postgres database developed by Databricks. It is designed to integrate seamlessly with data lakehouses, making it easier to manage both transactional and analytical data in a single environment.
Lakebase is built for the AI era, supporting high-speed queries and scalability while eliminating the complexity of traditional database management. It allows developers to sync data between lakehouse tables and Lakebase records automatically, continuously, or based on specific conditions.
Seamless Integration: It connects operational databases with data lakes, eliminating silos between transactional and analytical workloads.
Scalability & Performance: Built on Postgres, it supports high-speed queries and efficient scaling for AI-driven applications.
Simplified Management: Fully managed by Databricks, reducing the complexity of provisioning and maintaining databases.
AI & ML Capabilities: Supports feature serving, retrieval-augmented generation (RAG), and other AI-driven workflows.
Multi-Cloud Support: Works across different cloud environments, ensuring flexibility and reliability.
Best Practices
Optimize Data Synchronization: Use managed sync between Delta Lake and Lakebase to avoid complex ETL pipelines.
Leverage AI & ML Features: Take advantage of feature serving and retrieval-augmented generation (RAG) for AI-driven applications.
Ensure Secure Access: Use Unity Catalog for authentication and governance, ensuring controlled access to data.
Monitor Performance: Regularly analyze query performance and optimize indexes to maintain efficiency.
Utilize Multi-Cloud Flexibility: Deploy across different cloud environments for scalability and reliability.
Lakebase is built for the AI era, supporting high-speed queries and scalability while eliminating the complexity of traditional database management. It allows developers to sync data between lakehouse tables and Lakebase records automatically, continuously, or based on specific conditions.
Seamless Integration: It connects operational databases with data lakes, eliminating silos between transactional and analytical workloads.
Scalability & Performance: Built on Postgres, it supports high-speed queries and efficient scaling for AI-driven applications.
Simplified Management: Fully managed by Databricks, reducing the complexity of provisioning and maintaining databases.
AI & ML Capabilities: Supports feature serving, retrieval-augmented generation (RAG), and other AI-driven workflows.
Multi-Cloud Support: Works across different cloud environments, ensuring flexibility and reliability.
Best Practices
Optimize Data Synchronization: Use managed sync between Delta Lake and Lakebase to avoid complex ETL pipelines.
Leverage AI & ML Features: Take advantage of feature serving and retrieval-augmented generation (RAG) for AI-driven applications.
Ensure Secure Access: Use Unity Catalog for authentication and governance, ensuring controlled access to data.
Monitor Performance: Regularly analyze query performance and optimize indexes to maintain efficiency.
Utilize Multi-Cloud Flexibility: Deploy across different cloud environments for scalability and reliability.
Thursday, May 8, 2025
Explain the query processing in PySpark
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[Department#1], functions=[sum(Salary#2L)])
+- Exchange hashpartitioning(Department#1, 200),
ENSURE_REQUIREMENTS, [id=#60]
+- HashAggregate(keys=[Department#1],
functions=[partial_sum(Salary#2L)])
+- InMemoryTableScan [Department#1, Salary#2L]
+- InMemoryRelation [Name#0, Department#1, Salary#2L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Scan ExistingRDD[Name#0,Department#1,Salary#2L]
Let's break down what the physical plan is showing you:
1. **AdaptiveSparkPlan:**
The top node, `AdaptiveSparkPlan isFinalPlan=false`, indicates that Spark's Adaptive Query Execution (AQE) is enabled. AQE means Spark can adjust its physical plan at runtime based on the actual data and statistics. Here, it informs you that the current plan is not final and may be optimized further as the job executes.
2. **Final Global Aggregation (HashAggregate):**
The next step is a `HashAggregate` node that groups data by the key `[Department#1]` and uses the aggregation function `sum(Salary#2L)`. This is the final step in computing the total salary per department. Because no grouping keys are passed to the final `groupBy()` (if you had one on the top level, it would be a global aggregation), here it's grouping on the department key to produce the desired result.
3. **Data Exchange (Exchange):**
Before reaching the final aggregation, there's an `Exchange` node. This node handles the data shuffle by redistributing rows across 200 partitions based on hash partitioning of the department column. The exchange ensures that all rows for the same department end up in the same partition so that the subsequent aggregation can compute the final sum correctly. The `ENSURE_REQUIREMENTS` note indicates that Spark is satisfying physical properties (like partitioning) required by the following operators.
4. **Partial Aggregation (HashAggregate):**
Beneath the exchange, another `HashAggregate` node appears. This node computes partial sums of salaries per department. Partial aggregation is a common technique in distributed computing because it reduces the amount of data that has to be shuffled over the network by performing some of the aggregation locally within each partition.
5. **Data Source (InMemoryTableScan and InMemoryRelation):**
The data is ultimately sourced from an `InMemoryTableScan` on the columns `[Department#1, Salary#2L]`. This scan reads data from an `InMemoryRelation`, which is a cached version of your dataset stored in memory (with a storage level that includes disk fallback and a single replica). The `Scan ExistingRDD` at the bottom indicates that this cached DataFrame (or RDD) is being scanned to provide the required columns to the aggregation pipeline.
In summary, the plan shows that Spark is reading data from memory, performing a local (partial) aggregation to compute partial sums of salaries by department, then shuffling the data so that all rows with the same department are grouped together, and finally computing the global sum for each department. This multi-phase strategy (partial then final aggregation) is used to optimize performance and reduce data movement across nodes.
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[Department#1], functions=[sum(Salary#2L)])
+- Exchange hashpartitioning(Department#1, 200),
ENSURE_REQUIREMENTS, [id=#60]
+- HashAggregate(keys=[Department#1],
functions=[partial_sum(Salary#2L)])
+- InMemoryTableScan [Department#1, Salary#2L]
+- InMemoryRelation [Name#0, Department#1, Salary#2L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Scan ExistingRDD[Name#0,Department#1,Salary#2L]
Let's break down what the physical plan is showing you:
1. **AdaptiveSparkPlan:**
The top node, `AdaptiveSparkPlan isFinalPlan=false`, indicates that Spark's Adaptive Query Execution (AQE) is enabled. AQE means Spark can adjust its physical plan at runtime based on the actual data and statistics. Here, it informs you that the current plan is not final and may be optimized further as the job executes.
2. **Final Global Aggregation (HashAggregate):**
The next step is a `HashAggregate` node that groups data by the key `[Department#1]` and uses the aggregation function `sum(Salary#2L)`. This is the final step in computing the total salary per department. Because no grouping keys are passed to the final `groupBy()` (if you had one on the top level, it would be a global aggregation), here it's grouping on the department key to produce the desired result.
3. **Data Exchange (Exchange):**
Before reaching the final aggregation, there's an `Exchange` node. This node handles the data shuffle by redistributing rows across 200 partitions based on hash partitioning of the department column. The exchange ensures that all rows for the same department end up in the same partition so that the subsequent aggregation can compute the final sum correctly. The `ENSURE_REQUIREMENTS` note indicates that Spark is satisfying physical properties (like partitioning) required by the following operators.
4. **Partial Aggregation (HashAggregate):**
Beneath the exchange, another `HashAggregate` node appears. This node computes partial sums of salaries per department. Partial aggregation is a common technique in distributed computing because it reduces the amount of data that has to be shuffled over the network by performing some of the aggregation locally within each partition.
5. **Data Source (InMemoryTableScan and InMemoryRelation):**
The data is ultimately sourced from an `InMemoryTableScan` on the columns `[Department#1, Salary#2L]`. This scan reads data from an `InMemoryRelation`, which is a cached version of your dataset stored in memory (with a storage level that includes disk fallback and a single replica). The `Scan ExistingRDD` at the bottom indicates that this cached DataFrame (or RDD) is being scanned to provide the required columns to the aggregation pipeline.
In summary, the plan shows that Spark is reading data from memory, performing a local (partial) aggregation to compute partial sums of salaries by department, then shuffling the data so that all rows with the same department are grouped together, and finally computing the global sum for each department. This multi-phase strategy (partial then final aggregation) is used to optimize performance and reduce data movement across nodes.
Thursday, April 24, 2025
How to flatten a complex JSON file - Example 2
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructType
from pyspark.sql.functions import col, explode_outer
def flatten(df):
""" Recursively flattens a PySpark DataFrame with nested structures. For any column whose type is either ArrayType or StructType: - If it is a StructType, the function expands each field of the struct into a new column. The new column names are in the form "parentField_childField". - If it is an ArrayType, the function uses `explode_outer` to convert each element of the array into a separate row (which is useful for arrays of structs or primitive types).
Parameters: df (DataFrame): The input DataFrame that may contain nested columns.
Returns: DataFrame: A flattened DataFrame with no nested columns. """
# Identify columns whose type is either ArrayType or StructType.
complex_fields = {field.name: field.dataType for field in df.schema.fields if isinstance(field.dataType, (ArrayType, StructType))}
while complex_fields:
for col_name, col_type in complex_fields.items():
if isinstance(col_type, StructType):
# For a struct: expand its fields as separate columns.
expanded = [ col(col_name + '.' + subfield.name).alias(col_name + '_' + subfield.name) for subfield in col_type.fields ]
df = df.select("*", *expanded).drop(col_name)
elif isinstance(col_type, ArrayType):
# For an array, explode it so that each element becomes a separate row.
df = df.withColumn(col_name, explode_outer(col_name))
# Recompute the schema to check whether more nested columns remain.
complex_fields = {field.name: field.dataType for field in df.schema.fields if isinstance(field.dataType, (ArrayType, StructType))}
return df
Example Usage
if __name__ == "__main__":
spark = SparkSession.builder.appName("FlattenNestedJson").getOrCreate()
# Replace this with the path to your nested employee JSON file.
json_file_path = "/path/to/employee_record.json"
# Read the nested JSON file.
df = spark.read.json(json_file_path)
# Apply the flatten function.
flat_df = flatten(df)
# Display the flattened DataFrame.
flat_df.show(truncate=False)
spark.stop()
Detecting Complex Types: The function first builds a dictionary (complex_fields) of column names pointing to their data types for any field that is a nested structure (either an array or a struct).
Processing Structs: For each field of type StructType, the code iterates over its subfields and creates new columns named in the pattern "parent_subfield". The original nested column is then dropped.
Processing Arrays: For columns with ArrayType, the function calls explode_outer which converts each element of the array into a separate row (ensuring that null or empty arrays are handled gracefully).
Iterative Flattening: After processing the current set of nested fields, the function rebuilds the dictionary to catch any newly exposed nested fields. This loop continues until no more complex types remain.
from pyspark.sql.types import ArrayType, StructType
from pyspark.sql.functions import col, explode_outer
def flatten(df):
""" Recursively flattens a PySpark DataFrame with nested structures. For any column whose type is either ArrayType or StructType: - If it is a StructType, the function expands each field of the struct into a new column. The new column names are in the form "parentField_childField". - If it is an ArrayType, the function uses `explode_outer` to convert each element of the array into a separate row (which is useful for arrays of structs or primitive types).
Parameters: df (DataFrame): The input DataFrame that may contain nested columns.
Returns: DataFrame: A flattened DataFrame with no nested columns. """
# Identify columns whose type is either ArrayType or StructType.
complex_fields = {field.name: field.dataType for field in df.schema.fields if isinstance(field.dataType, (ArrayType, StructType))}
while complex_fields:
for col_name, col_type in complex_fields.items():
if isinstance(col_type, StructType):
# For a struct: expand its fields as separate columns.
expanded = [ col(col_name + '.' + subfield.name).alias(col_name + '_' + subfield.name) for subfield in col_type.fields ]
df = df.select("*", *expanded).drop(col_name)
elif isinstance(col_type, ArrayType):
# For an array, explode it so that each element becomes a separate row.
df = df.withColumn(col_name, explode_outer(col_name))
# Recompute the schema to check whether more nested columns remain.
complex_fields = {field.name: field.dataType for field in df.schema.fields if isinstance(field.dataType, (ArrayType, StructType))}
return df
Example Usage
if __name__ == "__main__":
spark = SparkSession.builder.appName("FlattenNestedJson").getOrCreate()
# Replace this with the path to your nested employee JSON file.
json_file_path = "/path/to/employee_record.json"
# Read the nested JSON file.
df = spark.read.json(json_file_path)
# Apply the flatten function.
flat_df = flatten(df)
# Display the flattened DataFrame.
flat_df.show(truncate=False)
spark.stop()
Detecting Complex Types: The function first builds a dictionary (complex_fields) of column names pointing to their data types for any field that is a nested structure (either an array or a struct).
Processing Structs: For each field of type StructType, the code iterates over its subfields and creates new columns named in the pattern "parent_subfield". The original nested column is then dropped.
Processing Arrays: For columns with ArrayType, the function calls explode_outer which converts each element of the array into a separate row (ensuring that null or empty arrays are handled gracefully).
Iterative Flattening: After processing the current set of nested fields, the function rebuilds the dictionary to catch any newly exposed nested fields. This loop continues until no more complex types remain.
How to flatten a complex JSON file - Example 1
from pyspark.sql.types import *
from pyspark.sql.functions import *
def flatten(df):
compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0:
col_name=list(complex_fields.keys())[0]
print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
if StructType then convert all sub element to columns.
i.e. flatten structs
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]
df=df.select("*", *expanded).drop(col_name)
if ArrayType then add the Array Elements as Rows using the explode function
i.e. explode Arrays
elif (type(complex_fields[col_name]) == ArrayType):
df=df.withColumn(col_name,explode_outer(col_name))
recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return df
Sample Nested Employee Data
[
{
"employeeId": "E001",
"name": {
"first": "John",
"last": "Doe"
},
"contact": {
"email": "john.doe@example.com",
"phones": [
"555-1234",
"555-5678"
]
},
"address": {
"street": "123 Elm St",
"city": "Springfield",
"state": "IL",
"zipcode": "62704"
},
"department": {
"deptId": "D001",
"deptName": "Engineering"
},
"projects": [
{ "projectId": "P001",
"projectName": "Redesign Website", "duration": "3 months" },
{
"projectId": "P002",
"projectName": "Develop Mobile App",
"duration": "6 months"
}
]
},
{
"employeeId": "E002",
"name": {
"first": "Jane",
"last": "Smith"
},
"contact": {
"email": "jane.smith@example.com",
"phones": [
"555-9876"
]
},
"address": {
"street": "456 Oak St",
"city": "Riverside",
"state": "CA",
"zipcode": "92501"
},
"department": {
"deptId": "D002",
"deptName": "Marketing"
},
"projects": [
{
"projectId": "P003",
"projectName": "Product Launch",
"duration": "2 months"
}
]
}
]
from pyspark.sql.functions import *
def flatten(df):
compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0:
col_name=list(complex_fields.keys())[0]
print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
if StructType then convert all sub element to columns.
i.e. flatten structs
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]
df=df.select("*", *expanded).drop(col_name)
if ArrayType then add the Array Elements as Rows using the explode function
i.e. explode Arrays
elif (type(complex_fields[col_name]) == ArrayType):
df=df.withColumn(col_name,explode_outer(col_name))
recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return df
Sample Nested Employee Data
[
{
"employeeId": "E001",
"name": {
"first": "John",
"last": "Doe"
},
"contact": {
"email": "john.doe@example.com",
"phones": [
"555-1234",
"555-5678"
]
},
"address": {
"street": "123 Elm St",
"city": "Springfield",
"state": "IL",
"zipcode": "62704"
},
"department": {
"deptId": "D001",
"deptName": "Engineering"
},
"projects": [
{ "projectId": "P001",
"projectName": "Redesign Website", "duration": "3 months" },
{
"projectId": "P002",
"projectName": "Develop Mobile App",
"duration": "6 months"
}
]
},
{
"employeeId": "E002",
"name": {
"first": "Jane",
"last": "Smith"
},
"contact": {
"email": "jane.smith@example.com",
"phones": [
"555-9876"
]
},
"address": {
"street": "456 Oak St",
"city": "Riverside",
"state": "CA",
"zipcode": "92501"
},
"department": {
"deptId": "D002",
"deptName": "Marketing"
},
"projects": [
{
"projectId": "P003",
"projectName": "Product Launch",
"duration": "2 months"
}
]
}
]
Thursday, April 17, 2025
PySpark PartitionBy - Example
Read a CSV file and group by Year, for each year write the resulting data in the partition.
df.spark.read.format("csv").option("inferschema", True).option("header", True).option("sep", ",").load("/FileStore/tables/baby_name/input/")
display(df)
df.groupBy("Year").count().show(truncate=False)
df.write.option("header", True).partitionBy("Year").mode("overwrite").csv("/FileStore/tables/flower_name/output/")
Read a CSV file and group by Year, Color and write it by Year, for each year write the resulting data in the partition.
df.write.option("header", True).partitionBy("Year", "Color").mode("overwrite").csv("/FileStore/tables/flower_name/output/")
Read a CSV file, partition it based on the number of records, in this case 4000 records per partition, for each year write the resulting data in the partition.artition
df.write.option("header", True).option("maxRecordsPerFile", 4200).partitionBy("Year").mode("overwrite").csv("/FileStore/tables/flower_name/output/")
dbutils.fs.rm("/FileStore/tables/flower_name/output/", True)
dbutils.fs.mkdir("/FileStore/tables/flower_name/output/")
Below is an example of how you might partition a dataset of employee records by a year column using PySpark.
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, to_date, col
Initialize the Spark session
spark = SparkSession.builder \
.appName("EmployeeRecordsPartitioning") \
.getOrCreate()
Read the employee records from a CSV file. Adjust the file path and options as needed.
df = spark.read.csv("employee_records.csv", header=True, inferSchema=True)
Optional: Convert the 'hire_date' column from string to date format.
(Assumes hire_date is stored in "yyyy-MM-dd" format)
df = df.withColumn("hire_date", to_date(col("hire_date"), "yyyy-MM-dd"))
Extract the year from the 'hire_date' column. If your dataset already has a year column, this step isn’t necessary.
df = df.withColumn("year", year(col("hire_date")))
Display a few rows to verify the new 'year' column.
df.show()
Write the DataFrame partitioning the records into separate directories by the 'year' value.
The resulting partitioned data is stored in Parquet format at the specified output path.
output_path = "/path/to/output/employee_partitioned"
df.write.mode("overwrite").partitionBy("year").parquet(output_path)
Stop the Spark session
spark.stop()
df.spark.read.format("csv").option("inferschema", True).option("header", True).option("sep", ",").load("/FileStore/tables/baby_name/input/")
display(df)
df.groupBy("Year").count().show(truncate=False)
df.write.option("header", True).partitionBy("Year").mode("overwrite").csv("/FileStore/tables/flower_name/output/")
Read a CSV file and group by Year, Color and write it by Year, for each year write the resulting data in the partition.
df.write.option("header", True).partitionBy("Year", "Color").mode("overwrite").csv("/FileStore/tables/flower_name/output/")
Read a CSV file, partition it based on the number of records, in this case 4000 records per partition, for each year write the resulting data in the partition.artition
df.write.option("header", True).option("maxRecordsPerFile", 4200).partitionBy("Year").mode("overwrite").csv("/FileStore/tables/flower_name/output/")
dbutils.fs.rm("/FileStore/tables/flower_name/output/", True)
dbutils.fs.mkdir("/FileStore/tables/flower_name/output/")
Below is an example of how you might partition a dataset of employee records by a year column using PySpark.
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, to_date, col
Initialize the Spark session
spark = SparkSession.builder \
.appName("EmployeeRecordsPartitioning") \
.getOrCreate()
Read the employee records from a CSV file. Adjust the file path and options as needed.
df = spark.read.csv("employee_records.csv", header=True, inferSchema=True)
Optional: Convert the 'hire_date' column from string to date format.
(Assumes hire_date is stored in "yyyy-MM-dd" format)
df = df.withColumn("hire_date", to_date(col("hire_date"), "yyyy-MM-dd"))
Extract the year from the 'hire_date' column. If your dataset already has a year column, this step isn’t necessary.
df = df.withColumn("year", year(col("hire_date")))
Display a few rows to verify the new 'year' column.
df.show()
Write the DataFrame partitioning the records into separate directories by the 'year' value.
The resulting partitioned data is stored in Parquet format at the specified output path.
output_path = "/path/to/output/employee_partitioned"
df.write.mode("overwrite").partitionBy("year").parquet(output_path)
Stop the Spark session
spark.stop()
Tuesday, April 15, 2025
Databricks EXPLAIN Plan
The Databricks EXPLAIN plan is a built‐in tool that lets you peek under the hood of your Spark SQL queries or DataFrame operations. Its main purpose is to show exactly how your high-level statement is translated, optimized, and executed across your cluster. Here’s a streamlined summary:
Multiple Layers of Query Representation:
Parsed Logical Plan: This is where Spark first interprets your query's syntax without yet resolving table or column names.
Analyzed Logical Plan: In this stage, Spark resolves these names and data types, transforming your raw query into one that reflects the structure of your data.
Optimized Logical Plan: Spark then applies various optimization rules such as predicate pushdown, projection pruning, and join reordering—essentially refining the query for efficiency without changing its result.
Physical Plan: Finally, the engine decides on specific execution strategies (like scans, joins, and shuffles) and constructs a plan that details how your operations will run on the cluster.
Modes of EXPLAIN:
Simple Mode (default): Shows only the final physical plan.
Extended Mode: Provides all stages—from the parsed plan through to the physical plan.
Formatted Mode: Organizes the output into a neat overview (physical plan outline) and detailed node information.
Cost and Codegen Modes: Offer additional insights such as cost statistics (when available) or even generated code for physical operations.
Why It’s Valuable:
Debugging and Performance Tuning: By examining each layer, you can identify expensive operations (e.g., data shuffles) or inefficient join strategies, which is crucial for optimizing performance and debugging complex queries.
Understanding Spark’s Optimizations: It offers transparency into how Catalyst (Spark’s optimizer) works, helping you appreciate the transition from high-level code to the low-level execution tasks actually run on your hardware.
In essence, the Databricks EXPLAIN plan is like having a roadmap of how your data moves and transforms from the moment you write your query to the time results are delivered. This detail is invaluable for both debugging query issues and refining performance, especially as your datasets and transformations grow more complex.
Multiple Layers of Query Representation:
Parsed Logical Plan: This is where Spark first interprets your query's syntax without yet resolving table or column names.
Analyzed Logical Plan: In this stage, Spark resolves these names and data types, transforming your raw query into one that reflects the structure of your data.
Optimized Logical Plan: Spark then applies various optimization rules such as predicate pushdown, projection pruning, and join reordering—essentially refining the query for efficiency without changing its result.
Physical Plan: Finally, the engine decides on specific execution strategies (like scans, joins, and shuffles) and constructs a plan that details how your operations will run on the cluster.
Modes of EXPLAIN:
Simple Mode (default): Shows only the final physical plan.
Extended Mode: Provides all stages—from the parsed plan through to the physical plan.
Formatted Mode: Organizes the output into a neat overview (physical plan outline) and detailed node information.
Cost and Codegen Modes: Offer additional insights such as cost statistics (when available) or even generated code for physical operations.
Why It’s Valuable:
Debugging and Performance Tuning: By examining each layer, you can identify expensive operations (e.g., data shuffles) or inefficient join strategies, which is crucial for optimizing performance and debugging complex queries.
Understanding Spark’s Optimizations: It offers transparency into how Catalyst (Spark’s optimizer) works, helping you appreciate the transition from high-level code to the low-level execution tasks actually run on your hardware.
In essence, the Databricks EXPLAIN plan is like having a roadmap of how your data moves and transforms from the moment you write your query to the time results are delivered. This detail is invaluable for both debugging query issues and refining performance, especially as your datasets and transformations grow more complex.
Monday, April 14, 2025
Implementing Time Travel
Implementing Time Travel
One of Delta Lake’s standout features is time travel. Thanks to its transaction log, Delta Lake stores the entire change history of a table. This makes it possible to query older snapshots (or versions) of your data. Time travel is useful for auditing, debugging, and even reproducing models from historical data.
Example 1: Query by Version Number
# Read a previous version of the Delta table
df_previous = spark.read.format("delta") \
.option("versionAsOf", 3) \
.load("/mnt/delta/my_table")
df_previous.show()
Query by timestamp
# Read the table state as of a specific timestamp
df_as_of = spark.read.format("delta") \
.option("timestampAsOf", "2025-04-01 00:00:00") \
.load("/mnt/delta/my_table")
df_as_of.show()
# Read the table state as of a specific timestamp
df_as_of = spark.read.format("delta") \
.option("timestampAsOf", "2025-04-01 00:00:00") \
.load("/mnt/delta/my_table")
df_as_of.show()
Explanation:
Version-based Time Travel:The versionAsOf parameter allows you to specify the exact version of the table you wish to query.
Timestamp-based Time Travel: Alternatively, using timestampAsOf you can retrieve the table state as it existed at a particular time.
You can also use SQL to view the table’s history:
DESCRIBE HISTORY my_table:
This command lets you see all the changes (inserts, updates, deletes) that have been applied over time.
Time travel can be incredibly powerful for investigating issues or rolling back accidental changes, ensuring a higher degree of data reliability and auditability.
Wrapping Up
Delta Lake’s capabilities—incremental upsert via the MERGE API, file optimization through Z‑Ordering, and historical querying using time travel—enable you to build robust, high-performance data pipelines. They allow you to process only new or changed data, optimize query performance by reorganizing on-disk data, and easily access snapshots of your data from the past.
One of Delta Lake’s standout features is time travel. Thanks to its transaction log, Delta Lake stores the entire change history of a table. This makes it possible to query older snapshots (or versions) of your data. Time travel is useful for auditing, debugging, and even reproducing models from historical data.
Example 1: Query by Version Number
# Read a previous version of the Delta table
df_previous = spark.read.format("delta") \
.option("versionAsOf", 3) \
.load("/mnt/delta/my_table")
df_previous.show()
Query by timestamp
# Read the table state as of a specific timestamp
df_as_of = spark.read.format("delta") \
.option("timestampAsOf", "2025-04-01 00:00:00") \
.load("/mnt/delta/my_table")
df_as_of.show()
# Read the table state as of a specific timestamp
df_as_of = spark.read.format("delta") \
.option("timestampAsOf", "2025-04-01 00:00:00") \
.load("/mnt/delta/my_table")
df_as_of.show()
Explanation:
Version-based Time Travel:The versionAsOf parameter allows you to specify the exact version of the table you wish to query.
Timestamp-based Time Travel: Alternatively, using timestampAsOf you can retrieve the table state as it existed at a particular time.
You can also use SQL to view the table’s history:
DESCRIBE HISTORY my_table:
This command lets you see all the changes (inserts, updates, deletes) that have been applied over time.
Time travel can be incredibly powerful for investigating issues or rolling back accidental changes, ensuring a higher degree of data reliability and auditability.
Wrapping Up
Delta Lake’s capabilities—incremental upsert via the MERGE API, file optimization through Z‑Ordering, and historical querying using time travel—enable you to build robust, high-performance data pipelines. They allow you to process only new or changed data, optimize query performance by reorganizing on-disk data, and easily access snapshots of your data from the past.
Optimizing Table Performance with Z‑Ordering
Optimizing Table Performance with Z‑Ordering
Over time, frequent incremental loads (plus file-level operations like compaction) can result in many small files. Queries filtering on certain columns might have to scan many files, which can slow performance. Z‑Ordering is a technique that reorganizes data on disk based on one or more columns. When your table is physically organized by those columns, queries that filter on them can skip reading irrelevant files.
Example: Optimize and Z‑Order a Delta Table
Once your Delta table has been updated with incremental loads, you can run the following SQL command to improve query performance: # Optimize the table and perform Z‑Ordering on the 'id' column spark.sql("OPTIMIZE my_table ZORDER BY (id)")
Explanation:
OPTIMIZE Command: This command compacts small files into larger ones.
ZORDER BY: By ordering the data using the specified column (id in this case), Delta Lake clusters similar values together. This reduction in file-level fragmentation means that queries filtering on id will scan fewer files—cutting down the overall I/O and speeding up query execution .
Tip: You can Z‑Order on multiple columns if your query filters often include more than one attribute (e.g., ZORDER BY (country, city)).
Over time, frequent incremental loads (plus file-level operations like compaction) can result in many small files. Queries filtering on certain columns might have to scan many files, which can slow performance. Z‑Ordering is a technique that reorganizes data on disk based on one or more columns. When your table is physically organized by those columns, queries that filter on them can skip reading irrelevant files.
Example: Optimize and Z‑Order a Delta Table
Once your Delta table has been updated with incremental loads, you can run the following SQL command to improve query performance: # Optimize the table and perform Z‑Ordering on the 'id' column spark.sql("OPTIMIZE my_table ZORDER BY (id)")
Explanation:
OPTIMIZE Command: This command compacts small files into larger ones.
ZORDER BY: By ordering the data using the specified column (id in this case), Delta Lake clusters similar values together. This reduction in file-level fragmentation means that queries filtering on id will scan fewer files—cutting down the overall I/O and speeding up query execution .
Tip: You can Z‑Order on multiple columns if your query filters often include more than one attribute (e.g., ZORDER BY (country, city)).
Performing Incremental Data Loads
Performing Incremental Data Loads
When your data source continuously generates new or updated records, you don’t want to reload the entire dataset each time. Instead, you can load only the changes (i.e., incremental data) and merge them into your Delta table. Delta Lake provides the powerful MERGE API to do this.
Example: Upsert New Records Using Delta Lake’s MERGE API
Suppose you have a Delta table stored at /mnt/delta/my_table and you receive a new batch of records as a DataFrame called new_data_df. You can use the following code to merge (upsert) the incremental changes:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp
# Example incremental data (new or updated rows)
new_data = [
(1, "Alice", 70000.0),
(3, "Charlie", 80000.0) # new record with id=3
]
columns = ["id", "name", "salary"]
new_data_df = spark.createDataFrame(new_data, columns)
Reference the existing Delta table
deltaTable = DeltaTable.forPath(spark, "/mnt/delta/my_table")
Perform MERGE to upsert new data
deltaTable.alias("t").merge(
new_data_df.alias("s"),
"t.id = s.id" # join condition: match records on id
).whenMatchedUpdate(
set={
"name": "s.name",
"salary": "s.salary",
"last_updated": "current_timestamp()"
}
).whenNotMatchedInsert(
values={
"id": "s.id",
"name": "s.name",
"salary": "s.salary",
"last_updated": "current_timestamp()"
}
).execute()
Explanation:
Merge Operation: The code matches incoming rows (s) to existing rows (t) based on the id column.
When Matched: If the record exists, it updates certain columns (and records the update time).
When Not Matched: If no match is found, it inserts the new record into the table.
This incremental load avoids reprocessing your entire dataset every time new data arrives, making your processes efficient—ideal for real-time or near-real-time analytics .
When your data source continuously generates new or updated records, you don’t want to reload the entire dataset each time. Instead, you can load only the changes (i.e., incremental data) and merge them into your Delta table. Delta Lake provides the powerful MERGE API to do this.
Example: Upsert New Records Using Delta Lake’s MERGE API
Suppose you have a Delta table stored at /mnt/delta/my_table and you receive a new batch of records as a DataFrame called new_data_df. You can use the following code to merge (upsert) the incremental changes:
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp
# Example incremental data (new or updated rows)
new_data = [
(1, "Alice", 70000.0),
(3, "Charlie", 80000.0) # new record with id=3
]
columns = ["id", "name", "salary"]
new_data_df = spark.createDataFrame(new_data, columns)
Reference the existing Delta table
deltaTable = DeltaTable.forPath(spark, "/mnt/delta/my_table")
Perform MERGE to upsert new data
deltaTable.alias("t").merge(
new_data_df.alias("s"),
"t.id = s.id" # join condition: match records on id
).whenMatchedUpdate(
set={
"name": "s.name",
"salary": "s.salary",
"last_updated": "current_timestamp()"
}
).whenNotMatchedInsert(
values={
"id": "s.id",
"name": "s.name",
"salary": "s.salary",
"last_updated": "current_timestamp()"
}
).execute()
Explanation:
Merge Operation: The code matches incoming rows (s) to existing rows (t) based on the id column.
When Matched: If the record exists, it updates certain columns (and records the update time).
When Not Matched: If no match is found, it inserts the new record into the table.
This incremental load avoids reprocessing your entire dataset every time new data arrives, making your processes efficient—ideal for real-time or near-real-time analytics .
Tuesday, March 18, 2025
Commonly used DataFrame functions
Data Manipulation Functions
1. select(): Selects a subset of columns from the DataFrame.
2. filter(): Filters the DataFrame based on a condition.
3. where(): Similar to filter(), but allows for more complex conditions.
4. groupBy(): Groups the DataFrame by one or more columns.
5. agg(): Performs aggregation operations on the grouped DataFrame.
6. join(): Joins two DataFrames based on a common column.
7. union(): Combines two DataFrames into a single DataFrame.
8. intersect(): Returns the intersection of two DataFrames.
9. exceptAll(): Returns the difference between two DataFrames.
Data Transformation Functions
1. withColumn(): Adds a new column to the DataFrame.
2. withColumnRenamed(): Renames an existing column in the DataFrame.
3. drop(): Drops one or more columns from the DataFrame.
4. cast(): Casts a column to a different data type.
5. orderBy(): Sorts the DataFrame by one or more columns.
6. sort(): Similar to orderBy(), but allows for more complex sorting.
7. repartition(): Repartitions the DataFrame into a specified number of partitions.
Data Analysis Functions
1. count(): Returns the number of rows in the DataFrame.
2. sum(): Returns the sum of a column in the DataFrame.
3. avg(): Returns the average of a column in the DataFrame.
4. max(): Returns the maximum value of a column in the DataFrame.
5. min(): Returns the minimum value of a column in the DataFrame.
6. groupBy().pivot(): Pivots the DataFrame by a column and performs aggregation.
7. corr(): Returns the correlation between two columns in the DataFrame.
Data Inspection Functions
1. show(): Displays the first few rows of the DataFrame.
2. printSchema(): Prints the schema of the DataFrame.
3. dtypes: Returns the data types of the columns in the DataFrame.
4. columns: Returns the column names of the DataFrame.
5. head(): Returns the first few rows of the DataFrame.
1. select(): Selects a subset of columns from the DataFrame.
2. filter(): Filters the DataFrame based on a condition.
3. where(): Similar to filter(), but allows for more complex conditions.
4. groupBy(): Groups the DataFrame by one or more columns.
5. agg(): Performs aggregation operations on the grouped DataFrame.
6. join(): Joins two DataFrames based on a common column.
7. union(): Combines two DataFrames into a single DataFrame.
8. intersect(): Returns the intersection of two DataFrames.
9. exceptAll(): Returns the difference between two DataFrames.
Data Transformation Functions
1. withColumn(): Adds a new column to the DataFrame.
2. withColumnRenamed(): Renames an existing column in the DataFrame.
3. drop(): Drops one or more columns from the DataFrame.
4. cast(): Casts a column to a different data type.
5. orderBy(): Sorts the DataFrame by one or more columns.
6. sort(): Similar to orderBy(), but allows for more complex sorting.
7. repartition(): Repartitions the DataFrame into a specified number of partitions.
Data Analysis Functions
1. count(): Returns the number of rows in the DataFrame.
2. sum(): Returns the sum of a column in the DataFrame.
3. avg(): Returns the average of a column in the DataFrame.
4. max(): Returns the maximum value of a column in the DataFrame.
5. min(): Returns the minimum value of a column in the DataFrame.
6. groupBy().pivot(): Pivots the DataFrame by a column and performs aggregation.
7. corr(): Returns the correlation between two columns in the DataFrame.
Data Inspection Functions
1. show(): Displays the first few rows of the DataFrame.
2. printSchema(): Prints the schema of the DataFrame.
3. dtypes: Returns the data types of the columns in the DataFrame.
4. columns: Returns the column names of the DataFrame.
5. head(): Returns the first few rows of the DataFrame.
Why user defined function should be wrapped using UDF()
In PySpark, the udf function is used to wrap a user-defined function (UDF) so that it can be used with PySpark DataFrames. Here are some reasons why you should use the udf function:
1. Type Safety: When you use the udf function, you need to specify the return type of the UDF. This helps catch type-related errors at runtime.
2. Serialization: PySpark needs to serialize the UDF and send it to the executors. The udf function takes care of serializing the UDF.
3. Registration: The udf function registers the UDF with PySpark, making it available for use with DataFrames.
4. Optimization: PySpark can optimize the execution of the UDF, such as reusing the UDF across multiple rows.
5. Integration with PySpark API: The udf function allows you to integrate your UDF with the PySpark API, making it easier to use with DataFrames. from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define the UDF
def to_uppercase(s):
return s.upper()
# Wrap the UDF with the udf function
udf_to_uppercase = udf(to_uppercase, StringType())
# Use the UDF with a DataFrame
df = spark.createDataFrame([("John",), ("Mary",)], ["Name"])
df_uppercase = df.withColumn("Name_Uppercase", udf_to_uppercase(df["Name"]))
# Print the result
df_uppercase.show()
1. Type Safety: When you use the udf function, you need to specify the return type of the UDF. This helps catch type-related errors at runtime.
2. Serialization: PySpark needs to serialize the UDF and send it to the executors. The udf function takes care of serializing the UDF.
3. Registration: The udf function registers the UDF with PySpark, making it available for use with DataFrames.
4. Optimization: PySpark can optimize the execution of the UDF, such as reusing the UDF across multiple rows.
5. Integration with PySpark API: The udf function allows you to integrate your UDF with the PySpark API, making it easier to use with DataFrames. from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define the UDF
def to_uppercase(s):
return s.upper()
# Wrap the UDF with the udf function
udf_to_uppercase = udf(to_uppercase, StringType())
# Use the UDF with a DataFrame
df = spark.createDataFrame([("John",), ("Mary",)], ["Name"])
df_uppercase = df.withColumn("Name_Uppercase", udf_to_uppercase(df["Name"]))
# Print the result
df_uppercase.show()
Friday, March 14, 2025
Databricks Platform Architecture - Control Plane & Compute Plane
Databricks Platform Architecture
The Databricks platform architecture consists of two main components: the Control Plane and the Data Plane (also known as the Compute Plane). Here's a breakdown of each component and what resides in the customer's cloud account:
Control Plane:
Purpose: The control plane hosts Databricks' backend services, including the web application, REST APIs, and account management.
Location: The control plane is managed by Databricks and runs within Databricks' cloud account.
Components: It includes services for workspace management, job scheduling, cluster management, and other administrative functions.
Data Plane (Compute Plane):
Purpose: The data plane is responsible for data processing and client interactions.
Location: The data plane can be deployed in two ways:
Serverless Compute Plane: Databricks compute resources run in a serverless compute layer within Databricks' cloud account.
Classic Compute Plane: Databricks compute resources run in the customer's cloud account (e.g., AWS, Azure, GCP). This setup provides natural isolation as it runs within the customer's own virtual network.
Components: It includes clusters, notebooks, and other compute resources used for data processing and analytics.
Customer's Cloud Account:
Workspace Storage: Each Databricks workspace has an associated storage bucket or account in the customer's cloud account. This storage contains:
Workspace System Data: Includes notebook revisions, job run details, command results, and Spark logs.
DBFS (Databricks File System): A distributed file system accessible within Databricks environments, used for storing and accessing data.
The Databricks platform architecture consists of two main components: the Control Plane and the Data Plane (also known as the Compute Plane). Here's a breakdown of each component and what resides in the customer's cloud account:
Control Plane:
Purpose: The control plane hosts Databricks' backend services, including the web application, REST APIs, and account management.
Location: The control plane is managed by Databricks and runs within Databricks' cloud account.
Components: It includes services for workspace management, job scheduling, cluster management, and other administrative functions.
Data Plane (Compute Plane):
Purpose: The data plane is responsible for data processing and client interactions.
Location: The data plane can be deployed in two ways:
Serverless Compute Plane: Databricks compute resources run in a serverless compute layer within Databricks' cloud account.
Classic Compute Plane: Databricks compute resources run in the customer's cloud account (e.g., AWS, Azure, GCP). This setup provides natural isolation as it runs within the customer's own virtual network.
Components: It includes clusters, notebooks, and other compute resources used for data processing and analytics.
Customer's Cloud Account:
Workspace Storage: Each Databricks workspace has an associated storage bucket or account in the customer's cloud account. This storage contains:
Workspace System Data: Includes notebook revisions, job run details, command results, and Spark logs.
DBFS (Databricks File System): A distributed file system accessible within Databricks environments, used for storing and accessing data.
Thursday, March 13, 2025
Identify the segregation of business units across catalog as best practice.
Segregating business units across catalogs is considered a best practice for several reasons:
Data Isolation: By segregating business units across catalogs, you ensure that data is isolated and accessible only to the relevant business units. This helps maintain data security and privacy.
Access Control: It allows for more granular access control, enabling you to assign specific permissions to different business units. This ensures that users only have access to the data they need.
Simplified Management: Managing data and permissions becomes more straightforward when business units are segregated across catalogs. It reduces complexity and makes it easier to enforce data governance policies.
Compliance: Segregating business units helps in meeting regulatory and compliance requirements by ensuring that sensitive data is properly isolated and managed.
Performance Optimization: It can improve query performance by reducing the amount of data scanned and processed, as each catalog contains only the relevant data for a specific business unit.
Data Isolation: By segregating business units across catalogs, you ensure that data is isolated and accessible only to the relevant business units. This helps maintain data security and privacy.
Access Control: It allows for more granular access control, enabling you to assign specific permissions to different business units. This ensures that users only have access to the data they need.
Simplified Management: Managing data and permissions becomes more straightforward when business units are segregated across catalogs. It reduces complexity and makes it easier to enforce data governance policies.
Compliance: Segregating business units helps in meeting regulatory and compliance requirements by ensuring that sensitive data is properly isolated and managed.
Performance Optimization: It can improve query performance by reducing the amount of data scanned and processed, as each catalog contains only the relevant data for a specific business unit.
Identify using service principals for connections as best practice
Using service principals for connections is considered a best practice for several reasons:
Enhanced Security: Service principals provide a secure way to authenticate applications and services without relying on user credentials. This reduces the risk of exposing sensitive user credentials.
Least Privilege Access: Service principals can be granted the minimal permissions required to perform their tasks, following the principle of least privilege. This limits the potential damage in case of a security breach.
Automated Processes: Service principals are ideal for automated processes and scripts. They enable secure, consistent access to resources without requiring human intervention.
Compliance: Using service principals helps organizations comply with security policies and regulations by ensuring that service accounts are managed and secured properly.
Centralized Management: Service principals can be centrally managed through Azure Active Directory (AAD) or other identity providers, making it easier to monitor, audit, and control access.
Scalability: Service principals are designed to scale with your applications and services, providing a robust mechanism for authentication and authorization in dynamic and scalable environments.
Enhanced Security: Service principals provide a secure way to authenticate applications and services without relying on user credentials. This reduces the risk of exposing sensitive user credentials.
Least Privilege Access: Service principals can be granted the minimal permissions required to perform their tasks, following the principle of least privilege. This limits the potential damage in case of a security breach.
Automated Processes: Service principals are ideal for automated processes and scripts. They enable secure, consistent access to resources without requiring human intervention.
Compliance: Using service principals helps organizations comply with security policies and regulations by ensuring that service accounts are managed and secured properly.
Centralized Management: Service principals can be centrally managed through Azure Active Directory (AAD) or other identity providers, making it easier to monitor, audit, and control access.
Scalability: Service principals are designed to scale with your applications and services, providing a robust mechanism for authentication and authorization in dynamic and scalable environments.
Identify colocating metastores with a workspace as best practice
Colocating metastores with a workspace is considered a best practice for several reasons:
Performance Optimization: By colocating metastores with workspaces, you reduce latency and improve query performance. Data access and metadata retrieval are faster when they are in the same region.
Cost Efficiency: Colocating metastores and workspaces can help minimize data transfer costs. When data and metadata are in the same region, you avoid additional charges associated with cross-region data transfers.
Simplified Management: Managing data governance and access controls is more straightforward when metastores and workspaces are colocated. It ensures that policies and permissions are consistently applied across all data assets.
Data Compliance: Colocating metastores with workspaces helps in meeting data residency and compliance requirements. Many regulations mandate that data must be stored and processed within specific geographic regions.
Scalability: Colocating metastores with workspaces allows for better scalability. As your data and workloads grow, you can efficiently manage and scale resources within the same region.
Performance Optimization: By colocating metastores with workspaces, you reduce latency and improve query performance. Data access and metadata retrieval are faster when they are in the same region.
Cost Efficiency: Colocating metastores and workspaces can help minimize data transfer costs. When data and metadata are in the same region, you avoid additional charges associated with cross-region data transfers.
Simplified Management: Managing data governance and access controls is more straightforward when metastores and workspaces are colocated. It ensures that policies and permissions are consistently applied across all data assets.
Data Compliance: Colocating metastores with workspaces helps in meeting data residency and compliance requirements. Many regulations mandate that data must be stored and processed within specific geographic regions.
Scalability: Colocating metastores with workspaces allows for better scalability. As your data and workloads grow, you can efficiently manage and scale resources within the same region.
Implement data object access control
Implementing data object access control is crucial for ensuring that only authorized users can access or modify data within your Databricks workspace. Here's a step-by-step guide on how to implement data object access control using
Databricks Unity Catalog:
Step 1: Set Up Unity Catalog
Ensure Unity Catalog is enabled in your Databricks workspace. This involves configuring your metastore and setting up catalogs and schemas.
Step 2: Create Service Principals or Groups Create service principals or groups in Azure Active Directory (AAD) or you provider to manage permissions.
Step 3: Define Roles and Permissions Identify the roles and associated permissions needed for your data objects (e.g., read, write, manage).
Step 4: Assign Permissions to Catalogs, Schemas, and Tables
Use SQL commands to grant or revoke permissions on your data objects. Below are examples for different levels of the hierarchy:
Granting Permissions on a Catalog
GRANT USE CATALOG ON CATALOG TO ;
GRANT USE CATALOG ON CATALOG sales_catalog TO alice;
Granting Permissions on a Schema
GRANT USE SCHEMA ON SCHEMA. TO ;
GRANT USE CATALOG ON CATALOG finance_db TO alice;
Granting Permissions on a Table
GRANT SELECT ON TABLE.. TO ;
Step 5: Implement Fine-Grained Access Control
Apply fine-grained access control by defining row-level and column-level security policies.
Example: Row-Level Security
CREATE SECURITY POLICY ON TABLE ..
WITH (FILTER = );
CREATE SECURITY POLICY restrict_sales ON TABLE finance.sales.transactions WITH (FILTER = country = 'USA');
A policy named restrict_sales and you want to apply it to a table named transactions in the sales schema within the finance catalog. The policy should filter records where the country column is equal to 'USA'.
Step 6: Monitor and Audit Access
Enable auditing to track access and modifications to data objects. Regularly review audit logs to ensure compliance with security policies.
Step 7: Use RBAC for Workspaces and Compute Resources
Implement Role-Based Access Control (RBAC) to manage access to workspaces and compute resources, ensuring that users have the appropriate level of access.
By following these steps, you can effectively implement data object access control in your Databricks environment, ensuring that data is secure and only accessible to authorized users.
Databricks Unity Catalog:
Step 1: Set Up Unity Catalog
Ensure Unity Catalog is enabled in your Databricks workspace. This involves configuring your metastore and setting up catalogs and schemas.
Step 2: Create Service Principals or Groups Create service principals or groups in Azure Active Directory (AAD) or you provider to manage permissions.
Step 3: Define Roles and Permissions Identify the roles and associated permissions needed for your data objects (e.g., read, write, manage).
Step 4: Assign Permissions to Catalogs, Schemas, and Tables
Use SQL commands to grant or revoke permissions on your data objects. Below are examples for different levels of the hierarchy:
Granting Permissions on a Catalog
GRANT USE CATALOG ON CATALOG
GRANT USE CATALOG ON CATALOG sales_catalog TO alice;
Granting Permissions on a Schema
GRANT USE SCHEMA ON SCHEMA
GRANT USE CATALOG ON CATALOG finance_db TO alice;
Granting Permissions on a Table
GRANT SELECT ON TABLE
Step 5: Implement Fine-Grained Access Control
Apply fine-grained access control by defining row-level and column-level security policies.
Example: Row-Level Security
CREATE SECURITY POLICY
CREATE SECURITY POLICY restrict_sales ON TABLE finance.sales.transactions WITH (FILTER = country = 'USA');
A policy named restrict_sales and you want to apply it to a table named transactions in the sales schema within the finance catalog. The policy should filter records where the country column is equal to 'USA'.
Step 6: Monitor and Audit Access
Enable auditing to track access and modifications to data objects. Regularly review audit logs to ensure compliance with security policies.
Implement Role-Based Access Control (RBAC) to manage access to workspaces and compute resources, ensuring that users have the appropriate level of access.
By following these steps, you can effectively implement data object access control in your Databricks environment, ensuring that data is secure and only accessible to authorized users.
Identify how to query a three-layer namespace
To query a three-layer namespace in Databricks Unity Catalog, you'll need to reference the catalog, schema, and table names in your SQL query. A three-layer namespace typically involves the following structure:
Catalog: The highest level in the namespace hierarchy.
Schema: A container within a catalog that holds tables and views.
Table: The actual data object you want to query.
Here's an example of how to query a three-layer namespace:
Example SQL Query
SELECT * FROM..
WHERE
Catalog: The highest level in the namespace hierarchy.
Schema: A container within a catalog that holds tables and views.
Table: The actual data object you want to query.
Here's an example of how to query a three-layer namespace:
Example SQL Query
SELECT * FROM
Create a Databricks SQL (DBSQL) warehouse
To create a Databricks SQL (DBSQL) warehouse, follow these steps:
Log in to your Databricks account:
Go to the Databricks workspace where you want to create the SQL warehouse.
Navigate to SQL Warehouses:
From the left-hand sidebar, click on the "SQL" tab to access Databricks SQL features.
In the SQL workspace, click on the "SQL Warehouses" tab.
Create a new SQL Warehouse:
Click on the "Create SQL Warehouse" button.
Configure the SQL Warehouse:
Warehouse Name: Give your warehouse a meaningful name.
Cluster Size: Choose the appropriate cluster size for your workload.
Auto Stop: Set the auto stop time for the warehouse to save costs when it's not in use.
Spot Instances: Optionally, enable spot instances to reduce costs.
Set Access Controls:
Configure access controls and permissions for the SQL warehouse as needed.
Add users, groups, or service principals who should have access to the warehouse.
Create the SQL Warehouse:
Review all the settings and configurations.
Click on the "Create" button to launch your SQL warehouse.
Log in to your Databricks account:
Go to the Databricks workspace where you want to create the SQL warehouse.
Navigate to SQL Warehouses:
From the left-hand sidebar, click on the "SQL" tab to access Databricks SQL features.
In the SQL workspace, click on the "SQL Warehouses" tab.
Create a new SQL Warehouse:
Click on the "Create SQL Warehouse" button.
Configure the SQL Warehouse:
Warehouse Name: Give your warehouse a meaningful name.
Cluster Size: Choose the appropriate cluster size for your workload.
Auto Stop: Set the auto stop time for the warehouse to save costs when it's not in use.
Spot Instances: Optionally, enable spot instances to reduce costs.
Set Access Controls:
Configure access controls and permissions for the SQL warehouse as needed.
Add users, groups, or service principals who should have access to the warehouse.
Create the SQL Warehouse:
Review all the settings and configurations.
Click on the "Create" button to launch your SQL warehouse.
How to create a UC-enabled all-purpose cluster
To create a Unity Catalog (UC)-enabled all-purpose cluster in Databricks, follow these steps:
Go to your Databricks workspace:
Log in to your Databricks account.
Navigate to your workspace.
Create a new cluster:
Click on the "Clusters" tab in the left-hand sidebar.
Click on the "Create Cluster" button.
Configure the cluster:
Cluster Name: Give your cluster a meaningful name.
Cluster Mode: Select "Standard" or "Single Node" based on your needs.
Databricks Runtime Version: Choose a runtime version that supports Unity Catalog. Make sure it's a UC-compatible version.
Node Type: Choose the appropriate node type for your workload.
Number of Workers: Specify the number of worker nodes.
Enable Unity Catalog:
In the "Advanced Options" section, locate the "Unity Catalog" settings.
Enable the Unity Catalog by selecting the appropriate option. This might involve specifying the UC metastore ID or other relevant configuration details.
Set Access Controls:
Configure access controls and permissions for the cluster as needed.
Add users, groups, or service principals who should have access to the cluster.
Create the Cluster:
Review all the settings and configurations.
Click on the "Create Cluster" button to launch your UC-enabled all-purpose cluster.
By following these steps, you'll have a cluster that can leverage Unity Catalog for enhanced data governance and management.
Go to your Databricks workspace:
Log in to your Databricks account.
Navigate to your workspace.
Create a new cluster:
Click on the "Clusters" tab in the left-hand sidebar.
Click on the "Create Cluster" button.
Configure the cluster:
Cluster Name: Give your cluster a meaningful name.
Cluster Mode: Select "Standard" or "Single Node" based on your needs.
Databricks Runtime Version: Choose a runtime version that supports Unity Catalog. Make sure it's a UC-compatible version.
Node Type: Choose the appropriate node type for your workload.
Number of Workers: Specify the number of worker nodes.
Enable Unity Catalog:
In the "Advanced Options" section, locate the "Unity Catalog" settings.
Enable the Unity Catalog by selecting the appropriate option. This might involve specifying the UC metastore ID or other relevant configuration details.
Set Access Controls:
Configure access controls and permissions for the cluster as needed.
Add users, groups, or service principals who should have access to the cluster.
Create the Cluster:
Review all the settings and configurations.
Click on the "Create Cluster" button to launch your UC-enabled all-purpose cluster.
By following these steps, you'll have a cluster that can leverage Unity Catalog for enhanced data governance and management.
Identify the cluster security modes compatible with Unity Catalog
The cluster security modes compatible with Unity Catalog are:
Single User Access Mode: This mode is recommended for workloads requiring privileged machine access or using RDD APIs, distributed ML, GPUs, Databricks Container Service, or R.
Shared Access Mode: Also known as Standard Access Mode, this mode is recommended for most workloads. It allows multiple users to attach and concurrently execute workloads on the same compute resource, providing significant cost savings and simplified cluster management
Single User Access Mode: This mode is recommended for workloads requiring privileged machine access or using RDD APIs, distributed ML, GPUs, Databricks Container Service, or R.
Shared Access Mode: Also known as Standard Access Mode, this mode is recommended for most workloads. It allows multiple users to attach and concurrently execute workloads on the same compute resource, providing significant cost savings and simplified cluster management
What is a Service Prinicipal
A service principal is an identity created for use with applications, hosted services, and automated tools to access specific Azure resources. It essentially acts as a security identity, similar to a user account, but specifically for services and applications. Service principals are a fundamental concept in managing and securing access in Azure Active Directory (AAD).
Key Features of Service Principals:
Authentication: Service principals authenticate and gain access to Azure resources using a client ID and client secret (password) or a certificate.
Access Control: You can assign roles to service principals, granting them specific permissions on Azure resources. This follows the principle of least privilege, where they get only the permissions necessary for their tasks.
Security: Service principals help maintain security by limiting the permissions of the service or application to only what it needs to function, reducing the risk of broader access that comes with user accounts.
Automation: Used to automate tasks and deploy applications, allowing seamless integration with CI/CD pipelines and other automated processes.
Key Features of Service Principals:
Authentication: Service principals authenticate and gain access to Azure resources using a client ID and client secret (password) or a certificate.
Access Control: You can assign roles to service principals, granting them specific permissions on Azure resources. This follows the principle of least privilege, where they get only the permissions necessary for their tasks.
Security: Service principals help maintain security by limiting the permissions of the service or application to only what it needs to function, reducing the risk of broader access that comes with user accounts.
Automation: Used to automate tasks and deploy applications, allowing seamless integration with CI/CD pipelines and other automated processes.
Unity Catalog Securables
Unity Catalog securables are objects defined in the Unity Catalog metastore on which privileges can be granted to a principal (user, service principal, or group). These securable objects are hierarchical and include:
Metastore: The top-level container for metadata.
Catalog: The first layer of the object hierarchy, used to organize data assets.
Schema: Also known as databases, schemas contain tables and views.
Table: The lowest level in the object hierarchy, tables can be external or managed.
View: A read-only object created from a query on one or more tables.
Materialized View: An object created from a query on one or more tables, reflecting the state of data when last refreshed.
Volume: Can be external or managed, used for storing data.
Function: A user-defined function or an MLflow registered model.
Model: An MLflow registered model, listed separately from other functions in Catalog Explorer.
These securable objects allow for granular control over data access and management within the Unity Catalog
Metastore: The top-level container for metadata.
Catalog: The first layer of the object hierarchy, used to organize data assets.
Schema: Also known as databases, schemas contain tables and views.
Table: The lowest level in the object hierarchy, tables can be external or managed.
View: A read-only object created from a query on one or more tables.
Materialized View: An object created from a query on one or more tables, reflecting the state of data when last refreshed.
Volume: Can be external or managed, used for storing data.
Function: A user-defined function or an MLflow registered model.
Model: An MLflow registered model, listed separately from other functions in Catalog Explorer.
These securable objects allow for granular control over data access and management within the Unity Catalog
Compare Metastores amd Catalogs
Metadata Management: Both metastores and catalogs manage metadata, but catalogs typically offer more advanced metadata management features.
Data Discovery and Governance: Catalogs provide more robust tools for data discovery, lineage tracking, and governance, whereas metastores focus primarily on storing and retrieving metadata.
Integration: Metastores can be a component within a catalog, providing the necessary metadata storage while the catalog offers additional functionalities for data governance and discovery.
Metastores:
Purpose: Metastores store metadata about the data assets in a system. Metadata includes information such as the schema, data types, location of the data, and other descriptive details.
Scope: Typically, a metastore provides a centralized repository for metadata across various data sources and databases.
Usage: Used by data processing engines to understand the structure and location of data, enabling efficient query execution and data management.
Examples: Hive Metastore, AWS Glue Data Catalog.
Catalogs:
Purpose: Catalogs provide a higher-level organizational structure for datasets, offering additional metadata management, data discovery, and governance capabilities.
Scope: Catalogs often include features for tagging, lineage tracking, data quality, and access control, making it easier to manage data assets within an organization.
Usage: Used by data stewards, analysts, and data scientists to discover, understand, and govern data assets. Catalogs may integrate with metastores to provide a comprehensive view of data.
Examples: Databricks Unity Catalog, Azure Purview, Alation Data Catalog.
Data Discovery and Governance: Catalogs provide more robust tools for data discovery, lineage tracking, and governance, whereas metastores focus primarily on storing and retrieving metadata.
Integration: Metastores can be a component within a catalog, providing the necessary metadata storage while the catalog offers additional functionalities for data governance and discovery.
Metastores:
Purpose: Metastores store metadata about the data assets in a system. Metadata includes information such as the schema, data types, location of the data, and other descriptive details.
Scope: Typically, a metastore provides a centralized repository for metadata across various data sources and databases.
Usage: Used by data processing engines to understand the structure and location of data, enabling efficient query execution and data management.
Examples: Hive Metastore, AWS Glue Data Catalog.
Catalogs:
Purpose: Catalogs provide a higher-level organizational structure for datasets, offering additional metadata management, data discovery, and governance capabilities.
Scope: Catalogs often include features for tagging, lineage tracking, data quality, and access control, making it easier to manage data assets within an organization.
Usage: Used by data stewards, analysts, and data scientists to discover, understand, and govern data assets. Catalogs may integrate with metastores to provide a comprehensive view of data.
Examples: Databricks Unity Catalog, Azure Purview, Alation Data Catalog.
Four areas of Data Governance
One of the four key areas of data governance is Data Quality. Ensuring that data is accurate, consistent, and reliable is fundamental to effective data governance. This includes defining data standards, implementing data validation processes, and continuously monitoring data quality to ensure that the data meets the organization’s requirements and can be trusted for decision-making.
Other important areas of data governance include:
Data Security and Privacy: Protecting data from unauthorized access and ensuring compliance with privacy regulations.
Data Management: Establishing processes and policies for data collection, storage, and lifecycle management.
Data Stewardship and Ownership: Assigning responsibility for data management and ensuring accountability for data integrity and usage.
Query optimization techniques
Z-Ordering is a data skipping technique used in data lakehouses, particularly in Databricks, that organizes data on disk to skip unnecessary reads, speeding up queries significantly. When compared to traditional data warehouses, Z-Ordering can offer substantial performance improvements.
Data skipping: This technique allows queries to bypass unnecessary data, reducing I/O operations. It's beneficial, but it often relies on other optimizations.
Z-Ordering: As explained, it clusters data to minimize I/O, dramatically improving query performance.
Bin-packing: This arranges data to improve storage efficiency, which indirectly helps with query performance but isn't as impactful on its own.
Write as a Parquet file: This provides efficient storage and fast query capabilities but doesn't inherently optimize query execution.
Tuning the file size: Adjusting file sizes can help with performance but is more of a fine-tuning step rather than a core optimization strategy.
Data skipping: This technique allows queries to bypass unnecessary data, reducing I/O operations. It's beneficial, but it often relies on other optimizations.
Z-Ordering: As explained, it clusters data to minimize I/O, dramatically improving query performance.
Bin-packing: This arranges data to improve storage efficiency, which indirectly helps with query performance but isn't as impactful on its own.
Write as a Parquet file: This provides efficient storage and fast query capabilities but doesn't inherently optimize query execution.
Tuning the file size: Adjusting file sizes can help with performance but is more of a fine-tuning step rather than a core optimization strategy.
Thursday, March 6, 2025
Read csv file and plot a graph
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import os
import time
import glob
import argparse
# Create SparkSession
spark = SparkSession.builder.appName("Employee Graph").getOrCreate()
def read_csv_file(file_path):
# Read CSV file into DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)
# Register DataFrame as temporary view
df.createOrReplaceTempView("employees")
# Count employees by department
department_counts = spark.sql("SELECT department, COUNT(*) as count FROM employees GROUP BY department").toPandas()
return department_counts
def create_bar_chart(department_counts):
plt.bar(department_counts['department'], department_counts['count'])
plt.xlabel('Department')
plt.ylabel('Number of Employees')
plt.title('Employee Distribution by Department')
plt.show()
def create_pie_chart(department_counts):
plt.pie(department_counts['count'], labels=department_counts['department'], autopct='%1.1f%%')
plt.title('Employee Distribution by Department')
plt.show()
def add_employee_to_db(df):
# Write DataFrame to SQLite database
df.write.format("jdbc").option("url", "jdbc:sqlite:employees.db").option("driver", "org.sqlite.JDBC").option("dbtable", "employees").save()
def main():
parser = argparse.ArgumentParser(description='Employee Graph')
parser.add_argument('--chart', choices=['bar', 'pie'], help='Type of chart to create')
args = parser.parse_args()
while True:
csv_files = glob.glob('*.csv')
if len(csv_files) > 0:
department_counts = read_csv_file(csv_files[-1])
if args.chart == 'bar':
create_bar_chart(department_counts)
elif args.chart == 'pie':
create_pie_chart(department_counts)
# Add employees to SQLite database
df = spark.read.csv(csv_files[-1], header=True, inferSchema=True)
add_employee_to_db(df)
time.sleep(60)
if __name__ == "__main__":
main()
Note that this code uses the pyspark.sql module to read and manipulate the CSV data, and the matplotlib library to create the charts. The add_employee_to_db function writes the DataFrame to a SQLite database using the jdbc format.
import matplotlib.pyplot as plt
import os
import time
import glob
import argparse
# Create SparkSession
spark = SparkSession.builder.appName("Employee Graph").getOrCreate()
def read_csv_file(file_path):
# Read CSV file into DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)
# Register DataFrame as temporary view
df.createOrReplaceTempView("employees")
# Count employees by department
department_counts = spark.sql("SELECT department, COUNT(*) as count FROM employees GROUP BY department").toPandas()
return department_counts
def create_bar_chart(department_counts):
plt.bar(department_counts['department'], department_counts['count'])
plt.xlabel('Department')
plt.ylabel('Number of Employees')
plt.title('Employee Distribution by Department')
plt.show()
def create_pie_chart(department_counts):
plt.pie(department_counts['count'], labels=department_counts['department'], autopct='%1.1f%%')
plt.title('Employee Distribution by Department')
plt.show()
def add_employee_to_db(df):
# Write DataFrame to SQLite database
df.write.format("jdbc").option("url", "jdbc:sqlite:employees.db").option("driver", "org.sqlite.JDBC").option("dbtable", "employees").save()
def main():
parser = argparse.ArgumentParser(description='Employee Graph')
parser.add_argument('--chart', choices=['bar', 'pie'], help='Type of chart to create')
args = parser.parse_args()
while True:
csv_files = glob.glob('*.csv')
if len(csv_files) > 0:
department_counts = read_csv_file(csv_files[-1])
if args.chart == 'bar':
create_bar_chart(department_counts)
elif args.chart == 'pie':
create_pie_chart(department_counts)
# Add employees to SQLite database
df = spark.read.csv(csv_files[-1], header=True, inferSchema=True)
add_employee_to_db(df)
time.sleep(60)
if __name__ == "__main__":
main()
Note that this code uses the pyspark.sql module to read and manipulate the CSV data, and the matplotlib library to create the charts. The add_employee_to_db function writes the DataFrame to a SQLite database using the jdbc format.
Monday, March 3, 2025
Processing RDD and the use of Parallelize
from pyspark import SparkContext
# Initialize SparkContext
sc = SparkContext("local", "RDD Example")
# Create an RDD from a list of data
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)
# Perform some basic operations on the RDD
# 1. Collect: Gather all elements of the RDD
collected_data = rdd.collect()
print("Collected data:", collected_data)
# 2. Map: Apply a function to each element
mapped_rdd = rdd.map(lambda x: x * 2)
print("Mapped data:", mapped_rdd.collect())
# 3. Filter: Filter elements based on a condition
print("Filtered data (even numbers):", filtered_rdd.collect())
# 4. Reduce: Aggregate elements using a function
sum_of_elements = rdd.reduce(lambda x, y: x + y)
print("Sum of elements:", sum_of_elements)
# 5. Count: Count the number of elements in the RDD
count_of_elements = rdd.count()
print("Count of elements:", count_of_elements)
# Stop the SparkContext
sc.stop()
Explanation: We initialize a SparkContext with a local master.
We create an RDD from a list of integers using the parallelize method.
We perform various operations on the RDD:
Collect: Gather all elements of the RDD and print them.
Map: Apply a function to each element (in this case, multiply by 2) and print the result.
Filter: Filter elements based on a condition (even numbers) and print the result.
Reduce: Aggregate elements by summing them and print the result.
Count: Count the number of elements in the RDD and print the result.
Using the parallelize method in PySpark is essential for several reasons:
Creating RDDs: parallelize allows you to create an RDD (Resilient Distributed Dataset) from an existing collection, such as a list or array. RDDs are fundamental data structures in Spark, enabling distributed data processing and fault tolerance.
Parallel Processing: When you use parallelize, the data is automatically distributed across the available computing resources (nodes) in the cluster. This means that operations on the RDD can be executed in parallel, significantly speeding up data processing.
Scalability: By parallelizing data, you can handle large datasets that wouldn't fit into the memory of a single machine. Spark distributes the data across the cluster, allowing you to process massive amounts of data efficiently.
Fault Tolerance: RDDs provide fault tolerance through lineage information. If a node fails during computation, Spark can recompute the lost data using the lineage information. This ensures the reliability of your data processing pipeline.
Ease of Use: The parallelize method simplifies the process of creating RDDs. You can quickly convert existing collections into RDDs and start applying transformations and actions using Spark's powerful API.
Here's a quick example to illustrate the use of parallelize:
from pyspark import SparkContext
# Initialize SparkContext
sc = SparkContext("local", "Parallelize Example")
# Create an RDD from a list of data using parallelize
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)
# Perform a simple transformation (map) and action (collect)
result = rdd.map(lambda x: x * 2).collect()
# Print the result
print("Result:", result)
# Stop the SparkContext
sc.stop()
In this example, parallelize creates an RDD from a list of integers, and the data is distributed across the cluster for parallel processing. We then apply a simple map transformation to double each element and collect the results
# Initialize SparkContext
sc = SparkContext("local", "RDD Example")
# Create an RDD from a list of data
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)
# Perform some basic operations on the RDD
# 1. Collect: Gather all elements of the RDD
collected_data = rdd.collect()
print("Collected data:", collected_data)
# 2. Map: Apply a function to each element
mapped_rdd = rdd.map(lambda x: x * 2)
print("Mapped data:", mapped_rdd.collect())
# 3. Filter: Filter elements based on a condition
print("Filtered data (even numbers):", filtered_rdd.collect())
# 4. Reduce: Aggregate elements using a function
sum_of_elements = rdd.reduce(lambda x, y: x + y)
print("Sum of elements:", sum_of_elements)
# 5. Count: Count the number of elements in the RDD
count_of_elements = rdd.count()
print("Count of elements:", count_of_elements)
# Stop the SparkContext
sc.stop()
Explanation: We initialize a SparkContext with a local master.
We create an RDD from a list of integers using the parallelize method.
We perform various operations on the RDD:
Collect: Gather all elements of the RDD and print them.
Map: Apply a function to each element (in this case, multiply by 2) and print the result.
Filter: Filter elements based on a condition (even numbers) and print the result.
Reduce: Aggregate elements by summing them and print the result.
Count: Count the number of elements in the RDD and print the result.
Using the parallelize method in PySpark is essential for several reasons:
Creating RDDs: parallelize allows you to create an RDD (Resilient Distributed Dataset) from an existing collection, such as a list or array. RDDs are fundamental data structures in Spark, enabling distributed data processing and fault tolerance.
Parallel Processing: When you use parallelize, the data is automatically distributed across the available computing resources (nodes) in the cluster. This means that operations on the RDD can be executed in parallel, significantly speeding up data processing.
Scalability: By parallelizing data, you can handle large datasets that wouldn't fit into the memory of a single machine. Spark distributes the data across the cluster, allowing you to process massive amounts of data efficiently.
Fault Tolerance: RDDs provide fault tolerance through lineage information. If a node fails during computation, Spark can recompute the lost data using the lineage information. This ensures the reliability of your data processing pipeline.
Ease of Use: The parallelize method simplifies the process of creating RDDs. You can quickly convert existing collections into RDDs and start applying transformations and actions using Spark's powerful API.
Here's a quick example to illustrate the use of parallelize:
from pyspark import SparkContext
# Initialize SparkContext
sc = SparkContext("local", "Parallelize Example")
# Create an RDD from a list of data using parallelize
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)
# Perform a simple transformation (map) and action (collect)
result = rdd.map(lambda x: x * 2).collect()
# Print the result
print("Result:", result)
# Stop the SparkContext
sc.stop()
In this example, parallelize creates an RDD from a list of integers, and the data is distributed across the cluster for parallel processing. We then apply a simple map transformation to double each element and collect the results
Thursday, February 27, 2025
Delta Lake and Apache Kafka
The technical features of both Delta Lake and Apache Kafka:
Delta Lake
ACID Transactions: Delta Lake ensures reliable and consistent data operations by supporting ACID (Atomicity, Consistency, Isolation, Durability) transactions.
Schema Enforcement and Evolution: It prevents schema mismatches while allowing gradual updates, ensuring data quality.
Time Travel: Delta Lake enables querying previous versions of data, which is useful for auditing and recovering from accidental changes.
Optimized Metadata Management: This feature improves query performance by efficiently managing metadata.
Scalability: Delta Lake supports both batch processing and real-time streaming analytics, making it suitable for large-scale data applications.
Apache Kafka
Scalability: Kafka can handle scalability in all four dimensions—event producers, event processors, event consumers, and event connectors—without downtime.
High-Volume Data Handling: Kafka can work with huge volumes of data streams efficiently.
Data Transformations: Kafka offers provisions for deriving new data streams from existing ones.
Fault Tolerance: The Kafka cluster can handle failures with masters and databases, ensuring reliability.
Durability: Kafka uses a distributed commit log, meaning messages persist on disk as fast as possible, ensuring data durability.
Performance: Kafka maintains high throughput for both publishing and subscribing messages, even with large volumes of data.
Zero Downtime: Kafka guarantees zero downtime and zero data loss, making it highly reliable.
Extensibility: Kafka offers various ways for applications to plug in and make use of its features, including writing new connectors as needed.
Both Delta Lake and Kafka have unique strengths that make them valuable for different aspects of data processing and analytics. Delta Lake excels in ensuring data reliability and consistency, while Kafka is a robust platform for handling high-velocity data streams.
Delta Lake
ACID Transactions: Delta Lake ensures reliable and consistent data operations by supporting ACID (Atomicity, Consistency, Isolation, Durability) transactions.
Schema Enforcement and Evolution: It prevents schema mismatches while allowing gradual updates, ensuring data quality.
Time Travel: Delta Lake enables querying previous versions of data, which is useful for auditing and recovering from accidental changes.
Optimized Metadata Management: This feature improves query performance by efficiently managing metadata.
Scalability: Delta Lake supports both batch processing and real-time streaming analytics, making it suitable for large-scale data applications.
Apache Kafka
Scalability: Kafka can handle scalability in all four dimensions—event producers, event processors, event consumers, and event connectors—without downtime.
High-Volume Data Handling: Kafka can work with huge volumes of data streams efficiently.
Data Transformations: Kafka offers provisions for deriving new data streams from existing ones.
Fault Tolerance: The Kafka cluster can handle failures with masters and databases, ensuring reliability.
Durability: Kafka uses a distributed commit log, meaning messages persist on disk as fast as possible, ensuring data durability.
Performance: Kafka maintains high throughput for both publishing and subscribing messages, even with large volumes of data.
Zero Downtime: Kafka guarantees zero downtime and zero data loss, making it highly reliable.
Extensibility: Kafka offers various ways for applications to plug in and make use of its features, including writing new connectors as needed.
Both Delta Lake and Kafka have unique strengths that make them valuable for different aspects of data processing and analytics. Delta Lake excels in ensuring data reliability and consistency, while Kafka is a robust platform for handling high-velocity data streams.
Databricks Learning Path
Databricks is a powerful platform for data engineering, analytics, and machine learning. Here are some important things to learn to get the most out of Databricks:
1. Understanding the Basics Databricks Workspace: Learn how to navigate the Databricks workspace, which provides a centralized environment for collaboration.
Notebooks: Get familiar with Databricks notebooks, which are similar to Jupyter notebooks but designed for collaboration and flexibility.
2. Apache Spark Integration Spark Basics: Understand the basics of Apache Spark, the distributed computing framework that powers Databricks.
Spark Configuration: Learn how Databricks handles Spark configuration automatically, allowing you to focus on building data solutions.
3. Data Engineering Data Ingestion: Learn how to ingest data from various sources into Databricks.
Data Transformation: Understand how to transform and process data using Spark and Databricks.
Delta Lake: Get to know Delta Lake, which provides ACID transactions, schema enforcement, and real-time data consistency.
4. Data Science and Machine Learning
Model Training: Learn how to train machine learning models using Databricks.
Model Deployment: Understand how to deploy machine learning models in Databricks.
MLflow: Get familiar with MLflow, an open-source platform for managing the end-to-end machine learning lifecycle.
5. SQL Analytics
SQL Queries: Learn how to run SQL queries in Databricks.
Visualization: Understand how to create visualizations and dashboards using Databricks SQL Analytics.
6. Automation and Orchestration Jobs: Learn how to create and manage jobs in Databricks to automate workflows. Workflows: Understand how to orchestrate complex workflows using Databricks.
7. Security and Governance Access Control: Learn how to manage access control and permissions in Databricks.
Data Governance: Understand how to implement data governance practices in Databricks.
8. Cloud Integration Cloud Providers: Get familiar with integrating Databricks with major cloud providers like AWS, Azure, and Google Cloud.
9. Performance Optimization Optimization Techniques: Learn techniques to optimize the performance of your Databricks workloads.
10. Certification and Training Databricks Academy: Explore training and certification programs offered by Databricks to validate your skills and knowledge.
1. Understanding the Basics Databricks Workspace: Learn how to navigate the Databricks workspace, which provides a centralized environment for collaboration.
Notebooks: Get familiar with Databricks notebooks, which are similar to Jupyter notebooks but designed for collaboration and flexibility.
2. Apache Spark Integration Spark Basics: Understand the basics of Apache Spark, the distributed computing framework that powers Databricks.
Spark Configuration: Learn how Databricks handles Spark configuration automatically, allowing you to focus on building data solutions.
3. Data Engineering Data Ingestion: Learn how to ingest data from various sources into Databricks.
Data Transformation: Understand how to transform and process data using Spark and Databricks.
Delta Lake: Get to know Delta Lake, which provides ACID transactions, schema enforcement, and real-time data consistency.
4. Data Science and Machine Learning
Model Training: Learn how to train machine learning models using Databricks.
Model Deployment: Understand how to deploy machine learning models in Databricks.
MLflow: Get familiar with MLflow, an open-source platform for managing the end-to-end machine learning lifecycle.
5. SQL Analytics
SQL Queries: Learn how to run SQL queries in Databricks.
Visualization: Understand how to create visualizations and dashboards using Databricks SQL Analytics.
6. Automation and Orchestration Jobs: Learn how to create and manage jobs in Databricks to automate workflows. Workflows: Understand how to orchestrate complex workflows using Databricks.
7. Security and Governance Access Control: Learn how to manage access control and permissions in Databricks.
Data Governance: Understand how to implement data governance practices in Databricks.
8. Cloud Integration Cloud Providers: Get familiar with integrating Databricks with major cloud providers like AWS, Azure, and Google Cloud.
9. Performance Optimization Optimization Techniques: Learn techniques to optimize the performance of your Databricks workloads.
10. Certification and Training Databricks Academy: Explore training and certification programs offered by Databricks to validate your skills and knowledge.
Tuesday, February 25, 2025
PySpark - Commonly used functions
Data Manipulation
1. withColumn(): Add a new column to a DataFrame.
2. withColumnRenamed(): Rename an existing column.
3. drop(): Drop one or more columns from a DataFrame.
4. cast(): Cast a column to a different data type.
Data Analysis
1. count(): Count the number of rows in a DataFrame.
2. sum(): Calculate the sum of a column.
3. avg(): Calculate the average of a column.
4. max(): Find the maximum value in a column.
5. min(): Find the minimum value in a column.
Data Transformation
1. explode(): Transform an array column into separate rows.
2. flatten(): Flatten a nested struct column.
3. split(): Split a string column into an array.
Dataframe Operations
1. distinct(): Return a DataFrame with unique rows.
2. intersect(): Return a DataFrame with rows common to two DataFrames
3. exceptAll(): Return a DataFrame with rows in the first DataFrame but not in the second.
4. repartition(): Repartition a DataFrame to increase or decrease the number of partitions.
5. coalesce(): Coalesce a DataFrame to reduce the number of partitions.
Data Manipulation
1. orderBy(): Order a DataFrame by one or more columns.
2. sort(): Sort a DataFrame by one or more columns.
3. limit(): Limit the number of rows in a DataFrame.
4. sample(): Return a sampled subset of a DataFrame.
5. randomSplit(): Split a DataFrame into multiple DataFrames randomly.
Data Analysis
1. corr(): Calculate the correlation between two columns.
2. cov(): Calculate the covariance between two columns.
3. skewness(): Calculate the skewness of a column.
4. kurtosis(): Calculate the kurtosis of a column.
5. approxQuantile(): Calculate an approximate quantile of a column.
Data Transformation
1. udf(): Create a user-defined function (UDF) to transform data.
2. apply(): Apply a UDF to a column.
3. transform(): Transform a DataFrame using a UDF.
4. map(): Map a DataFrame to a new DataFrame using a UDF.
String Functions
1. concat(): Concatenate two or more string columns.
2. length(): Calculate the length of a string column.
3. lower(): Convert a string column to lowercase.
4. upper(): Convert a string column to uppercase.
5. trim(): Trim whitespace from a string column.
Date and Time Functions
1. current_date(): Return the current date.
2. current_timestamp(): Return the current timestamp.
3. date_format(): Format a date column.
4. hour(): Extract the hour from a timestamp column.
5. dayofweek(): Extract the day of the week from a date column.
1. withColumn(): Add a new column to a DataFrame.
2. withColumnRenamed(): Rename an existing column.
3. drop(): Drop one or more columns from a DataFrame.
4. cast(): Cast a column to a different data type.
Data Analysis
1. count(): Count the number of rows in a DataFrame.
2. sum(): Calculate the sum of a column.
3. avg(): Calculate the average of a column.
4. max(): Find the maximum value in a column.
5. min(): Find the minimum value in a column.
Data Transformation
1. explode(): Transform an array column into separate rows.
2. flatten(): Flatten a nested struct column.
3. split(): Split a string column into an array.
Dataframe Operations
1. distinct(): Return a DataFrame with unique rows.
2. intersect(): Return a DataFrame with rows common to two DataFrames
3. exceptAll(): Return a DataFrame with rows in the first DataFrame but not in the second.
4. repartition(): Repartition a DataFrame to increase or decrease the number of partitions.
5. coalesce(): Coalesce a DataFrame to reduce the number of partitions.
Data Manipulation
1. orderBy(): Order a DataFrame by one or more columns.
2. sort(): Sort a DataFrame by one or more columns.
3. limit(): Limit the number of rows in a DataFrame.
4. sample(): Return a sampled subset of a DataFrame.
5. randomSplit(): Split a DataFrame into multiple DataFrames randomly.
Data Analysis
1. corr(): Calculate the correlation between two columns.
2. cov(): Calculate the covariance between two columns.
3. skewness(): Calculate the skewness of a column.
4. kurtosis(): Calculate the kurtosis of a column.
5. approxQuantile(): Calculate an approximate quantile of a column.
Data Transformation
1. udf(): Create a user-defined function (UDF) to transform data.
2. apply(): Apply a UDF to a column.
3. transform(): Transform a DataFrame using a UDF.
4. map(): Map a DataFrame to a new DataFrame using a UDF.
String Functions
1. concat(): Concatenate two or more string columns.
2. length(): Calculate the length of a string column.
3. lower(): Convert a string column to lowercase.
4. upper(): Convert a string column to uppercase.
5. trim(): Trim whitespace from a string column.
Date and Time Functions
1. current_date(): Return the current date.
2. current_timestamp(): Return the current timestamp.
3. date_format(): Format a date column.
4. hour(): Extract the hour from a timestamp column.
5. dayofweek(): Extract the day of the week from a date column.
PySpark - Union (combine two data frames)
from pyspark.sql import SparkSession
# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Sample data for DataFrame 1
data1 = [("Alice", 25, "New York"),
("Bob", 30, "Los Angeles"),
("Charlie", 35, "Chicago")]
# Sample data for DataFrame 2
data2 = [("David", 40, "San Francisco"),
("Eve", 45, "Miami"),
("Frank", 50, "Seattle")]
# Create DataFrames
columns = ["Name", "Age", "City"]
df1 = spark.createDataFrame(data1, columns)
df2 = spark.createDataFrame(data2, columns)
# Show the DataFrames
df1.show()
df2.show()
# Perform a union operation
union_df = df1.union(df2)
# Show the result
union_df.show()
# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Sample data for DataFrame 1
data1 = [("Alice", 25, "New York"),
("Bob", 30, "Los Angeles"),
("Charlie", 35, "Chicago")]
# Sample data for DataFrame 2
data2 = [("David", 40, "San Francisco"),
("Eve", 45, "Miami"),
("Frank", 50, "Seattle")]
# Create DataFrames
columns = ["Name", "Age", "City"]
df1 = spark.createDataFrame(data1, columns)
df2 = spark.createDataFrame(data2, columns)
# Show the DataFrames
df1.show()
df2.show()
# Perform a union operation
union_df = df1.union(df2)
# Show the result
union_df.show()
PySpark - Inner Join
from pyspark.sql import SparkSession
# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Sample data for DataFrame 1
data1 = [("Alice", 25, "New York"), ("Bob", 30, "Los Angeles"), ("Charlie", 35, "Chicago")]
# Sample data for DataFrame 2
data2 = [("Alice", "F"), ("Bob", "M"), ("Charlie", "M")]
# Create DataFrames
columns1 = ["Name", "Age", "City"]
columns2 = ["Name", "Gender"]
df1 = spark.createDataFrame(data1, columns1)
df2 = spark.createDataFrame(data2, columns2)
# Show the DataFrames
df1.show()
df2.show()
# Perform a join operation
joined_df = df1.join(df2, on="Name", how="inner")
# Show the result
joined_df.show()
spark = SparkSession.builder.appName("example").getOrCreate()
# Sample data for DataFrame 1
data1 = [("Alice", 25, "New York"), ("Bob", 30, "Los Angeles"), ("Charlie", 35, "Chicago")]
# Sample data for DataFrame 2
data2 = [("Alice", "F"), ("Bob", "M"), ("Charlie", "M")]
# Create DataFrames
columns1 = ["Name", "Age", "City"]
columns2 = ["Name", "Gender"]
df1 = spark.createDataFrame(data1, columns1)
df2 = spark.createDataFrame(data2, columns2)
# Show the DataFrames
df1.show()
df2.show()
# Perform a join operation
joined_df = df1.join(df2, on="Name", how="inner")
# Show the result
joined_df.show()
PySpark Functions - filter, groupBy, agg
from pyspark.sql import SparkSession
# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Sample data
data = [("Alice", 25, "New York"),
("Bob", 30, "Los Angeles"),
("Charlie", 35, "Chicago")]
# Create DataFrame
columns = ["Name", "Age", "City"]
df = spark.createDataFrame(data, columns)
# Show the DataFrame
df.show()
# Filter the DataFrame
filtered_data = df.where(df.Age > 30)
filtered_data.show()
# Group by 'City' and calculate the average 'Age'
grouped_data = df.groupBy("City").agg(avg("Age").alias("Average_Age"))
# Show the result
grouped_data.show()
# Use the agg() function to calculate the average and maximum age for each city
aggregated_data = df.groupBy("City").agg(avg("Age").alias("Average_Age"), max("Age").alias("Max_Age"))
# Show the result
aggregated_data.show()
# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Sample data
data = [("Alice", 25, "New York"),
("Bob", 30, "Los Angeles"),
("Charlie", 35, "Chicago")]
# Create DataFrame
columns = ["Name", "Age", "City"]
df = spark.createDataFrame(data, columns)
# Show the DataFrame
df.show()
# Filter the DataFrame
filtered_data = df.where(df.Age > 30)
filtered_data.show()
# Group by 'City' and calculate the average 'Age'
grouped_data = df.groupBy("City").agg(avg("Age").alias("Average_Age"))
# Show the result
grouped_data.show()
# Use the agg() function to calculate the average and maximum age for each city
aggregated_data = df.groupBy("City").agg(avg("Age").alias("Average_Age"), max("Age").alias("Max_Age"))
# Show the result
aggregated_data.show()
PySpark - Filter
from pyspark.sql import SparkSession
# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Sample data
data = [("Alice", 25, "New York"),
("Bob", 30, "Los Angeles"),
("Charlie", 35, "Chicago")]
# Create DataFrame
columns = ["Name", "Age", "City"]
df = spark.createDataFrame(data, columns)
# Show the DataFrame
df.show()
# Filter the DataFrame
filtered_data = df.filter(df.Age > 30)
# Show the result
filtered_data.show()
# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Sample data
data = [("Alice", 25, "New York"),
("Bob", 30, "Los Angeles"),
("Charlie", 35, "Chicago")]
# Create DataFrame
columns = ["Name", "Age", "City"]
df = spark.createDataFrame(data, columns)
# Show the DataFrame
df.show()
# Filter the DataFrame
filtered_data = df.filter(df.Age > 30)
# Show the result
filtered_data.show()
PySpark - Select
from pyspark.sql import SparkSession
# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Sample data
data = [("Alice", 25, "New York"),
("Bob", 30, "Los Angeles"),
("Charlie", 35, "Chicago")]
# Create DataFrame
columns = ["Name", "Age", "City"]
df = spark.createDataFrame(data, columns)
# Select specific columns
selected_columns = df.select("Name", "City")
# Show the result
selected_columns.show()
print(type(selected_columns))
In this example, a Spark session is created, and a DataFrame df is created with the columns 'Name', 'Age', and 'City'. The select method is used to create a new DataFrame selected_columns that only includes the 'Name' and 'City' columns.
# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Sample data
data = [("Alice", 25, "New York"),
("Bob", 30, "Los Angeles"),
("Charlie", 35, "Chicago")]
# Create DataFrame
columns = ["Name", "Age", "City"]
df = spark.createDataFrame(data, columns)
# Select specific columns
selected_columns = df.select("Name", "City")
# Show the result
selected_columns.show()
print(type(selected_columns))
In this example, a Spark session is created, and a DataFrame df is created with the columns 'Name', 'Age', and 'City'. The select method is used to create a new DataFrame selected_columns that only includes the 'Name' and 'City' columns.
How to use Split function in PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import split
# Initialize a Spark session
spark = SparkSession.builder \ .appName("Split Column") \ .getOrCreate()
# Sample data
data = [
(1, "John Doe"),
(2, "Jane Smith"),
(3, "Alice Johnson")
]
# Create DataFrame from sample data
df = spark.createDataFrame(data, ["id", "full_name"])
# Split the 'full_name' column into 'first_name' and 'last_name'
df_split = df.withColumn("first_name", split(df["full_name"], " ").getItem(0)) \ .withColumn("last_name", split(df["full_name"], " ").getItem(1))
# Show the resulting DataFrame
df_split.show()
# Stop the Spark session
spar
k.stop()
In t his example:
We initialize a Spark session.
We create a DataFrame from sample data with an id column and a full_name column.
We use the split function to split the full_name column into two new colu
mns: first_name and last_name.
We display the resulting DataFrame.
The split function splits the full_name column into an array of s
trings based on the delimiter (a space in this case), and then we use getItem(0) and getItem(1) to extract the first and last names, respectively.
# Initialize a Spark session
spark = SparkSession.builder \ .appName("Split Column") \ .getOrCreate()
# Sample data
data = [
(1, "John Doe"),
(2, "Jane Smith"),
(3, "Alice Johnson")
]
# Create DataFrame from sample data
df = spark.createDataFrame(data, ["id", "full_name"])
# Split the 'full_name' column into 'first_name' and 'last_name'
df_split = df.withColumn("first_name", split(df["full_name"], " ").getItem(0)) \ .withColumn("last_name", split(df["full_name"], " ").getItem(1))
# Show the resulting DataFrame
df_split.show()
# Stop the Spark session
spar
k.stop()
In t his example:
We initialize a Spark session.
We create a DataFrame from sample data with an id column and a full_name column.
We use the split function to split the full_name column into two new colu
mns: first_name and last_name.
We display the resulting DataFrame.
The split function splits the full_name column into an array of s
trings based on the delimiter (a space in this case), and then we use getItem(0) and getItem(1) to extract the first and last names, respectively.
Flatten a nested DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
# Initialize a Spark session
spark = SparkSession.builder \ .appName("Flatten DataFrame") \ .getOrCreate()
# Sample nested JSON data
data = [
{
"id": 1,
"name": "John",
"subjects": [{"subject": "Math", "score": 90}, {"subject": "English", "score": 85}]
},
{
"id": 2,
"name": "Jane",
"subjects": [{"subject": "Math", "score": 95}, {"subject": "Science", "score": 80}]
}
]
# Create DataFrame from sample data
df = spark.read.json(spark.sparkContext.parallelize(data))
# Flatten the DataFrame
flattened_df = df.withColumn("subject", explode(col("subjects"))).select("id", "name", col("subject.subject"), col("subject.score"))
# Show the flattened DataFrame
flattened_df.show()
# Stop the Spark session
spark.stop()
In this example:
We initialize a Spark session.
We create a DataFrame from nested JSON data.
We use the explode function to flatten the nested subjects array into individual rows.
We select the flattened columns and display the result.
----------------------------------------------------
The code snippet spark.read.json(spark.sparkContext.parallelize(data)) does the following:
Parallelizes Data: The spark.sparkContext.parallelize(data) part converts the data (which is a list of dictionaries in this case) into an RDD (Resilient Distributed Dataset), which is a fundamental data structure of Spark. This process spreads the data across multiple nodes in the Spark cluster, allowing parallel processing.
Reads JSON Data: The spark.read.json() part reads the parallelized RDD as a JSON dataset and converts it into a DataFrame. This step interprets the structure of the JSON data, defining the schema (the structure of columns) and their data types.
Combining these two parts, the code creates a DataFrame from the JSON data stored in the data variable, enabling you to perform various transformations and actions using Spark DataFrame API.
----------------------------------------------------
If you dont read the parallelized data you need to read from a data file and change the code as follows:
import json
# Write the data to a JSON file
with open('data.json', 'w') as f: json.dump(data, f)
# Read the JSON file into a DataFrame
df = spark.read.json('data.json')
# Show the DataFrame
df.show()
from pyspark.sql.functions import col, explode
# Initialize a Spark session
spark = SparkSession.builder \ .appName("Flatten DataFrame") \ .getOrCreate()
# Sample nested JSON data
data = [
{
"id": 1,
"name": "John",
"subjects": [{"subject": "Math", "score": 90}, {"subject": "English", "score": 85}]
},
{
"id": 2,
"name": "Jane",
"subjects": [{"subject": "Math", "score": 95}, {"subject": "Science", "score": 80}]
}
]
# Create DataFrame from sample data
df = spark.read.json(spark.sparkContext.parallelize(data))
# Flatten the DataFrame
flattened_df = df.withColumn("subject", explode(col("subjects"))).select("id", "name", col("subject.subject"), col("subject.score"))
# Show the flattened DataFrame
flattened_df.show()
# Stop the Spark session
spark.stop()
In this example:
We initialize a Spark session.
We create a DataFrame from nested JSON data.
We use the explode function to flatten the nested subjects array into individual rows.
We select the flattened columns and display the result.
----------------------------------------------------
The code snippet spark.read.json(spark.sparkContext.parallelize(data)) does the following:
Parallelizes Data: The spark.sparkContext.parallelize(data) part converts the data (which is a list of dictionaries in this case) into an RDD (Resilient Distributed Dataset), which is a fundamental data structure of Spark. This process spreads the data across multiple nodes in the Spark cluster, allowing parallel processing.
Reads JSON Data: The spark.read.json() part reads the parallelized RDD as a JSON dataset and converts it into a DataFrame. This step interprets the structure of the JSON data, defining the schema (the structure of columns) and their data types.
Combining these two parts, the code creates a DataFrame from the JSON data stored in the data variable, enabling you to perform various transformations and actions using Spark DataFrame API.
----------------------------------------------------
If you dont read the parallelized data you need to read from a data file and change the code as follows:
import json
# Write the data to a JSON file
with open('data.json', 'w') as f: json.dump(data, f)
# Read the JSON file into a DataFrame
df = spark.read.json('data.json')
# Show the DataFrame
df.show()
PySpark - Commonly used functions
Dataframe Operations
1. select(): Select specific columns from a DataFrame.
2. filter(): Filter rows based on conditions.
3. where(): Similar to filter(), but uses SQL-like syntax.
4. groupBy(): Group rows by one or more columns.
5. agg(): Perform aggregation operations (e.g., sum, count, avg).
6. join(): Join two DataFrames based on a common column.
7. union(): Combine two DataFrames into a single DataFrame.
Data Manipulation
1. withColumn(): Add a new column to a DataFrame.
2. withColumnRenamed(): Rename an existing column.
3. drop(): Drop one or more columns from a DataFrame.
4. cast(): Cast a column to a different data type.
Data Analysis
1. count(): Count the number of rows in a DataFrame.
2. sum(): Calculate the sum of a column.
3. avg(): Calculate the average of a column.
4. max(): Find the maximum value in a column.
5. min(): Find the minimum value in a column.
Data Transformation
1. explode(): Transform an array column into separate rows.
2. flatten(): Flatten a nested struct column.
3. split(): Split a string column into an array.
1. select(): Select specific columns from a DataFrame.
2. filter(): Filter rows based on conditions.
3. where(): Similar to filter(), but uses SQL-like syntax.
4. groupBy(): Group rows by one or more columns.
5. agg(): Perform aggregation operations (e.g., sum, count, avg).
6. join(): Join two DataFrames based on a common column.
7. union(): Combine two DataFrames into a single DataFrame.
Data Manipulation
1. withColumn(): Add a new column to a DataFrame.
2. withColumnRenamed(): Rename an existing column.
3. drop(): Drop one or more columns from a DataFrame.
4. cast(): Cast a column to a different data type.
Data Analysis
1. count(): Count the number of rows in a DataFrame.
2. sum(): Calculate the sum of a column.
3. avg(): Calculate the average of a column.
4. max(): Find the maximum value in a column.
5. min(): Find the minimum value in a column.
Data Transformation
1. explode(): Transform an array column into separate rows.
2. flatten(): Flatten a nested struct column.
3. split(): Split a string column into an array.
Collect - PySpark
Here's a sample of using the collect method in PySpark:
data = [("John", 25), ("Mary", 31), ("David", 42)]
df = spark.createDataFrame(data, ["Name", "Age"])
result = df.collect()
print(result)
Output:
[Row(Name='John', Age=25), Row(Name='Mary', Age=31), Row(Name='David', Age=42)]
The collect method returns all the rows in the DataFrame as a list of Row objects. Note that this can be memory-intensive for large DataFrames.
To display the age only from the first row, you can use the following code:
data = [("John", 25), ("Mary", 31), ("David", 42)]
df = spark.createDataFrame(data, ["Name", "Age"])
first_row = df.first()
age = first_row.Age
print(age)
Output:
25
Alternatively, you can use:
print(df.first().Age)
Both methods will display the age from the first row, which is 25.
In PySpark, collect() returns a list of Row objects.
collect[0] refers to the first Row object in the list.
collect[0][0] refers to the first element (or column value) within that first Row object.
So, collect[0][0] essentially gives you the value of the first column in the first row of the DataFrame.
Here's an example:
data = [("John", 25), ("Mary", 31), ("David", 42)]
df = spark.createDataFrame(data, ["Name", "Age"])
result = df.collect()
print(result[0][0])
# Output: John
data = [("John", 25), ("Mary", 31), ("David", 42)]
df = spark.createDataFrame(data, ["Name", "Age"])
result = df.collect()
print(result)
Output:
[Row(Name='John', Age=25), Row(Name='Mary', Age=31), Row(Name='David', Age=42)]
The collect method returns all the rows in the DataFrame as a list of Row objects. Note that this can be memory-intensive for large DataFrames.
To display the age only from the first row, you can use the following code:
data = [("John", 25), ("Mary", 31), ("David", 42)]
df = spark.createDataFrame(data, ["Name", "Age"])
first_row = df.first()
age = first_row.Age
print(age)
Output:
25
Alternatively, you can use:
print(df.first().Age)
Both methods will display the age from the first row, which is 25.
In PySpark, collect() returns a list of Row objects.
collect[0] refers to the first Row object in the list.
collect[0][0] refers to the first element (or column value) within that first Row object.
So, collect[0][0] essentially gives you the value of the first column in the first row of the DataFrame.
Here's an example:
data = [("John", 25), ("Mary", 31), ("David", 42)]
df = spark.createDataFrame(data, ["Name", "Age"])
result = df.collect()
print(result[0][0])
# Output: John
Explode - PySpark
In PySpark, the explode function is used to transform each element of a collection-like column (e.g., array or map) into a separate row.
Suppose we have a DataFrame df with a column fruits that contains an array of fruit names:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
spark = SparkSession.builder.appName("Explode Example").getOrCreate()
data = [ ("John", ["Apple", "Banana", "Cherry"]),
("Mary", ["Orange", "Grapes", "Peach"]),
("David", ["Pear", "Watermelon", "Strawberry"])
]
df = spark.createDataFrame(data, ["name", "fruits"])
df.show()
Display the contents of "fruits" only
df.select("fruits").show(truncate=False)
- John: [Apple, Banana, Cherry]
- Mary: [Orange, Grapes, Peach]
- David: [Pear, Watermelon, Strawberry]
Output:
+-----+--------------------+
| name| fruits|
+-----+--------------------+
| John|[Apple, Banana, ...|
| Mary|[Orange, Grapes, ...|
|David|[Pear, Watermelon...|
+-----+--------------------+
Now, let's use the explode function to transform each element of the fruits array into a separate row:
df_exploded = df.withColumn("fruit", explode("fruits"))
df_exploded.show()
Here's what's happening:
1. df.withColumn(): This method adds a new column to the existing DataFrame df.
2. "fruit": This is the name of the new column being added.
3. explode("fruits"): This is the transformation being applied to create the new column. Output:
+-----+--------------------+------+
| name| fruits| fruit|
+-----+--------------------+------+
| John|[Apple, Banana, ...| Apple|
| John|[Apple, Banana, ...|Banana|
| John|[Apple, Banana, ...|Cherry|
| Mary|[Orange, Grapes, ...|Orange|
| Mary|[Orange, Grapes, ...|Grapes|
| Mary|[Orange, Grapes, ...| Peach|
|David|[Pear, Watermelon...| Pear|
|David|[Pear, Watermelon...|Watermelon|
|David|[Pear, Watermelon...|Strawberry|
+-----+--------------------+------+
As you can see, the explode function has transformed each element of the fruits array into a separate row, with the corresponding name value.
To select only the "name" and "fruit" columns from the df_exploded DataFrame, you can use the select method:
df_name_fruit = df_exploded.select("name", "fruit")
df_name_fruit.show()
Output:
+-----+------+
| name| fruit|
+-----+------+
| John| Apple|
| John|Banana|
| John|Cherry|
| Mary|Orange|
| Mary|Grapes|
| Mary| Peach|
|David| Pear|
|David|Watermelon|
|David|Strawberry|
+-----+------+
By using select, you're creating a new DataFrame (df_name_fruit) that contains only the specified columns.
Suppose we have a DataFrame df with a column fruits that contains an array of fruit names:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
spark = SparkSession.builder.appName("Explode Example").getOrCreate()
data = [ ("John", ["Apple", "Banana", "Cherry"]),
("Mary", ["Orange", "Grapes", "Peach"]),
("David", ["Pear", "Watermelon", "Strawberry"])
]
df = spark.createDataFrame(data, ["name", "fruits"])
df.show()
Display the contents of "fruits" only
df.select("fruits").show(truncate=False)
- John: [Apple, Banana, Cherry]
- Mary: [Orange, Grapes, Peach]
- David: [Pear, Watermelon, Strawberry]
Output:
+-----+--------------------+
| name| fruits|
+-----+--------------------+
| John|[Apple, Banana, ...|
| Mary|[Orange, Grapes, ...|
|David|[Pear, Watermelon...|
+-----+--------------------+
Now, let's use the explode function to transform each element of the fruits array into a separate row:
df_exploded = df.withColumn("fruit", explode("fruits"))
df_exploded.show()
Here's what's happening:
1. df.withColumn(): This method adds a new column to the existing DataFrame df.
2. "fruit": This is the name of the new column being added.
3. explode("fruits"): This is the transformation being applied to create the new column. Output:
+-----+--------------------+------+
| name| fruits| fruit|
+-----+--------------------+------+
| John|[Apple, Banana, ...| Apple|
| John|[Apple, Banana, ...|Banana|
| John|[Apple, Banana, ...|Cherry|
| Mary|[Orange, Grapes, ...|Orange|
| Mary|[Orange, Grapes, ...|Grapes|
| Mary|[Orange, Grapes, ...| Peach|
|David|[Pear, Watermelon...| Pear|
|David|[Pear, Watermelon...|Watermelon|
|David|[Pear, Watermelon...|Strawberry|
+-----+--------------------+------+
As you can see, the explode function has transformed each element of the fruits array into a separate row, with the corresponding name value.
To select only the "name" and "fruit" columns from the df_exploded DataFrame, you can use the select method:
df_name_fruit = df_exploded.select("name", "fruit")
df_name_fruit.show()
Output:
+-----+------+
| name| fruit|
+-----+------+
| John| Apple|
| John|Banana|
| John|Cherry|
| Mary|Orange|
| Mary|Grapes|
| Mary| Peach|
|David| Pear|
|David|Watermelon|
|David|Strawberry|
+-----+------+
By using select, you're creating a new DataFrame (df_name_fruit) that contains only the specified columns.
DataFrame Basics
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()
You can create a DataFrame from a variety of data sources, including CSV files, JSON files, and existing RDDs (Resilient Distributed Datasets).
# Read data from a CSV file
df = spark.read.csv("/path/to/file.csv", header=True, inferSchema=True)
# Read data from a JSON file
df = spark.read.json("/path/to/file.json")
# Create a DataFrame from a list of tuples data = [("Alice", 25), ("Bob", 30), ("Catherine", 28)] columns = ["Name", "Age"] df = spark.createDataFrame(data, columns)
Basic DataFrame Operations
# Display the first few rows of the DataFrame
df.show()
# Print the schema of the DataFrame
df.printSchema()
# Select specific columns
df.select("Name", "Age").show()
# Filter rows based on a condition
df.filter(df["Age"] > 25).show()
# Group by a column and calculate the average age df.groupBy("Name").avg("Age").show()
Writing DataFrames to Files
# Write DataFrame to a CSV file
df.write.csv("/path/to/output.csv", header=True)
# Write DataFrame to a JSON file
df.write.json("/path/to/output.json")
Transformations and Actions
In PySpark, there are two types of operations you can perform on a DataFrame: transformations and actions.
Transformations are operations that create a new DataFrame from an existing one (e.g., select, filter, groupBy). They are lazy, meaning they do not immediately compute their results
Actions are operations that trigger computation and return a result to the driver program or write data to external storage (e.g., show, collect, write).
Example Workflow
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()
# Read data from a CSV file
df = spark.read.csv("/path/to/file.csv", header=True, inferSchema=True)
# Display the first few rows
df.show()
# Print the schema
df.printSchema()
# Select and filter data
filtered_df = df.select("Name", "Age").filter(df["Age"] > 25)
# Group by and aggregate
grouped_df = filtered_df.groupBy("Name").avg("Age")
# Show the result
grouped_df.show()
# Write the result to a JSON file
grouped_df.write.json("/path/to/output.json")
# Initialize Spark session
spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()
You can create a DataFrame from a variety of data sources, including CSV files, JSON files, and existing RDDs (Resilient Distributed Datasets).
# Read data from a CSV file
df = spark.read.csv("/path/to/file.csv", header=True, inferSchema=True)
# Read data from a JSON file
df = spark.read.json("/path/to/file.json")
# Create a DataFrame from a list of tuples data = [("Alice", 25), ("Bob", 30), ("Catherine", 28)] columns = ["Name", "Age"] df = spark.createDataFrame(data, columns)
Basic DataFrame Operations
# Display the first few rows of the DataFrame
df.show()
# Print the schema of the DataFrame
df.printSchema()
# Select specific columns
df.select("Name", "Age").show()
# Filter rows based on a condition
df.filter(df["Age"] > 25).show()
# Group by a column and calculate the average age df.groupBy("Name").avg("Age").show()
Writing DataFrames to Files
# Write DataFrame to a CSV file
df.write.csv("/path/to/output.csv", header=True)
# Write DataFrame to a JSON file
df.write.json("/path/to/output.json")
Transformations and Actions
In PySpark, there are two types of operations you can perform on a DataFrame: transformations and actions.
Transformations are operations that create a new DataFrame from an existing one (e.g., select, filter, groupBy). They are lazy, meaning they do not immediately compute their results
Actions are operations that trigger computation and return a result to the driver program or write data to external storage (e.g., show, collect, write).
Example Workflow
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()
# Read data from a CSV file
df = spark.read.csv("/path/to/file.csv", header=True, inferSchema=True)
# Display the first few rows
df.show()
# Print the schema
df.printSchema()
# Select and filter data
filtered_df = df.select("Name", "Age").filter(df["Age"] > 25)
# Group by and aggregate
grouped_df = filtered_df.groupBy("Name").avg("Age")
# Show the result
grouped_df.show()
# Write the result to a JSON file
grouped_df.write.json("/path/to/output.json")
Data Lineage Example using SQL & Python
This example shows how to filter and aggregate data using Spark SQL
Load data from the source table
CREATE OR REPLACE TEMP VIEW sales_data AS SELECT * FROM delta.`/path/to/sales_data`
-- Filter data for the last year and calculate total sales per product category
CREATE OR REPLACE TABLE transformed_sales_data AS SELECT product_category, SUM(sales_amount) AS total_sales FROM sales_data WHERE year = 2024 GROUP BY product_category
Using a JOIN operation and see how the data lineage is captured
-- Load data from source tables CREATE OR REPLACE TEMP VIEW customer_data AS SELECT * FROM delta.`/path/to/customer_data` CREATE OR REPLACE TEMP VIEW order_data AS SELECT * FROM delta.`/path/to/order_data` -- Join customer data with order data and calculate total order amount per customer CREATE OR REPLACE TABLE customer_order_summary AS SELECT c.customer_id, c.customer_name, SUM(o.order_amount) AS total_order_amount FROM customer_data c JOIN order_data o ON c.customer_id = o.customer_id GROUP BY c.customer_id, c.customer_name
This example shows how to filter and aggregate data using DataFrame API
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("DataLineageExample").getOrCreate()
# Load data from the source table
sales_data = spark.read.format("delta").load("/path/to/sales_data")
# Filter data for the last year
filtered_data = sales_data.filter(sales_data["year"] == 2024)
# Calculate total sales per product category
transformed_sales_data = filtered_data.groupBy("product_category").sum("sales_amount").withColumnRenamed("sum(sales_amount)", "total_sales") # Save the transformed data to a new Delta table
transformed_sales_data.write.format("delta").mode("overwrite").saveAsTable("transformed_sales_data")
Using DataFrame API for JOIN Operation
# Load data from source tables
customer_data = spark.read.format("delta").load("/path/to/customer_data")
order_data = spark.read.format("delta").load("/path/to/order_data")
# Join customer data with order data
joined_data = customer_data.join(order_data, customer_data["customer_id"] == order_data["customer_id"])
# Calculate total order amount per customer
customer_order_summary = joined_data.groupBy("customer_id", "customer_name").sum("order_amount").withColumnRenamed("sum(order_amount)", "total_order_amount")
# Save the transformed data to a new Delta table
customer_order_summary.write.format("delta").mode("overwrite").saveAsTable("customer_order_summary")
Load data from the source table
CREATE OR REPLACE TEMP VIEW sales_data AS SELECT * FROM delta.`/path/to/sales_data`
-- Filter data for the last year and calculate total sales per product category
CREATE OR REPLACE TABLE transformed_sales_data AS SELECT product_category, SUM(sales_amount) AS total_sales FROM sales_data WHERE year = 2024 GROUP BY product_category
Using a JOIN operation and see how the data lineage is captured
-- Load data from source tables CREATE OR REPLACE TEMP VIEW customer_data AS SELECT * FROM delta.`/path/to/customer_data` CREATE OR REPLACE TEMP VIEW order_data AS SELECT * FROM delta.`/path/to/order_data` -- Join customer data with order data and calculate total order amount per customer CREATE OR REPLACE TABLE customer_order_summary AS SELECT c.customer_id, c.customer_name, SUM(o.order_amount) AS total_order_amount FROM customer_data c JOIN order_data o ON c.customer_id = o.customer_id GROUP BY c.customer_id, c.customer_name
This example shows how to filter and aggregate data using DataFrame API
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("DataLineageExample").getOrCreate()
# Load data from the source table
sales_data = spark.read.format("delta").load("/path/to/sales_data")
# Filter data for the last year
filtered_data = sales_data.filter(sales_data["year"] == 2024)
# Calculate total sales per product category
transformed_sales_data = filtered_data.groupBy("product_category").sum("sales_amount").withColumnRenamed("sum(sales_amount)", "total_sales") # Save the transformed data to a new Delta table
transformed_sales_data.write.format("delta").mode("overwrite").saveAsTable("transformed_sales_data")
Using DataFrame API for JOIN Operation
# Load data from source tables
customer_data = spark.read.format("delta").load("/path/to/customer_data")
order_data = spark.read.format("delta").load("/path/to/order_data")
# Join customer data with order data
joined_data = customer_data.join(order_data, customer_data["customer_id"] == order_data["customer_id"])
# Calculate total order amount per customer
customer_order_summary = joined_data.groupBy("customer_id", "customer_name").sum("order_amount").withColumnRenamed("sum(order_amount)", "total_order_amount")
# Save the transformed data to a new Delta table
customer_order_summary.write.format("delta").mode("overwrite").saveAsTable("customer_order_summary")
Data Lineage
Data lineage in Databricks refers to the ability to trace the path of data as it moves through various stages of processing within the Databricks platform. This includes tracking the origin, transformations, and final destination of data elements.
The data lineage can be visualized as a graph showing the relationships between the source and target tables, as well as the transformations applied. This helps you understand the flow of data and ensures transparency and traceability.
Setting up data lineage in Databricks involves using Unity Catalog, which automatically captures and visualizes data lineage across all your data objects. Here's a step-by-step guide to get you started:
Step-by-Step Setup Guide
Enable Unity Catalog: Ensure that your Databricks workspace has Unity Catalog enabled. This is a prerequisite for capturing data lineage.
Register Tables: Register your tables in a Unity Catalog metastore. This allows Unity Catalog to track and manage the metadata for your tables.
Run Queries: Execute your queries using Spark DataFrame (e.g., Spark SQL functions that return a DataFrame) or Databricks SQL interfaces. Unity Catalog will automatically capture the lineage information for these queries.
Visualize Lineage: Use the Catalog Explorer to visualize the data lineage. The lineage information is captured down to the column level and includes notebooks, jobs, and dashboards related to the query. You can view the lineage in near real-time.
Retrieve Lineage Programmatically: If needed, you can retrieve lineage information programmatically using the lineage system tables and the Databricks REST API. This allows you to integrate lineage data into your custom applications or workflows.
Requirements
Unity Catalog must be enabled in your workspace.
Tables must be registered in a Unity Catalog metastore.
Queries must use Spark DataFrame or Databricks SQL interfaces.
Users must have the appropriate permissions to view lineage information.
The data lineage can be visualized as a graph showing the relationships between the source and target tables, as well as the transformations applied. This helps you understand the flow of data and ensures transparency and traceability.
Setting up data lineage in Databricks involves using Unity Catalog, which automatically captures and visualizes data lineage across all your data objects. Here's a step-by-step guide to get you started:
Step-by-Step Setup Guide
Enable Unity Catalog: Ensure that your Databricks workspace has Unity Catalog enabled. This is a prerequisite for capturing data lineage.
Register Tables: Register your tables in a Unity Catalog metastore. This allows Unity Catalog to track and manage the metadata for your tables.
Run Queries: Execute your queries using Spark DataFrame (e.g., Spark SQL functions that return a DataFrame) or Databricks SQL interfaces. Unity Catalog will automatically capture the lineage information for these queries.
Visualize Lineage: Use the Catalog Explorer to visualize the data lineage. The lineage information is captured down to the column level and includes notebooks, jobs, and dashboards related to the query. You can view the lineage in near real-time.
Retrieve Lineage Programmatically: If needed, you can retrieve lineage information programmatically using the lineage system tables and the Databricks REST API. This allows you to integrate lineage data into your custom applications or workflows.
Requirements
Unity Catalog must be enabled in your workspace.
Tables must be registered in a Unity Catalog metastore.
Queries must use Spark DataFrame or Databricks SQL interfaces.
Users must have the appropriate permissions to view lineage information.
Saturday, February 22, 2025
What is Coalesce and how it works
The coalesce function is used to reduce the number of partitions in a DataFrame or RDD (Resilient Distributed Dataset). This can be useful for optimizing the performance of certain operations by reducing the overhead associated with managing a large number of partitions.
How It Works
Reducing Partitions: The coalesce function reduces the number of partitions by merging existing partitions without performing a full shuffle of the data. This makes it a more efficient option compared to the repartition function when you need to decrease the number of partitions.
Use Case: It's commonly used when you need to optimize the performance of certain operations, such as writing data to disk or performing actions that benefit from fewer partitions.
Example Usage
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("CoalesceExample").getOrCreate()
# Sample data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Diana", 4)]
df = spark.createDataFrame(data, ["name", "value"])
# Coalesce DataFrame to 2 partitions
coalesced_df = df.coalesce(2) # Show the number of partitions
print("Number of partitions after coalesce:", coalesced_df.rdd.getNumPartitions())
In this example, we start with a DataFrame that may have multiple partitions and use the coalesce function to reduce the number of partitions to 2.
Key Points
Efficiency: coalesce is more efficient than repartition for reducing the number of partitions because it avoids a full shuffle.
Read-Intensive Operations: It’s useful for read-intensive operations where having fewer partitions can improve performance.
Optimization: Helps optimize resource usage by consolidating data into fewer partitions.
How It Works
Reducing Partitions: The coalesce function reduces the number of partitions by merging existing partitions without performing a full shuffle of the data. This makes it a more efficient option compared to the repartition function when you need to decrease the number of partitions.
Use Case: It's commonly used when you need to optimize the performance of certain operations, such as writing data to disk or performing actions that benefit from fewer partitions.
Example Usage
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("CoalesceExample").getOrCreate()
# Sample data
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3), ("Diana", 4)]
df = spark.createDataFrame(data, ["name", "value"])
# Coalesce DataFrame to 2 partitions
coalesced_df = df.coalesce(2) # Show the number of partitions
print("Number of partitions after coalesce:", coalesced_df.rdd.getNumPartitions())
In this example, we start with a DataFrame that may have multiple partitions and use the coalesce function to reduce the number of partitions to 2.
Key Points
Efficiency: coalesce is more efficient than repartition for reducing the number of partitions because it avoids a full shuffle.
Read-Intensive Operations: It’s useful for read-intensive operations where having fewer partitions can improve performance.
Optimization: Helps optimize resource usage by consolidating data into fewer partitions.
What is Shuffling and how it works
Shuffling refers to the process of redistributing data across different partitions to perform operations that require data from multiple partitions to be grouped together. Shuffling is a complex operation that can significantly impact the performance of your Spark job, so understanding it is crucial.
Shuffling occurs during the following operations:
GroupBy: When grouping data by a key, data needs to be shuffled to ensure that all records with the same key end up in the same partition.
Join: When joining two datasets, Spark may need to shuffle data to align matching keys in the same partition.
ReduceByKey: When aggregating data by a key, shuffling ensures that all records with the same key are processed together.
Coalesce and Repartition: When changing the number of partitions, Spark may need to shuffle data to balance the data across the new partitions.
How Shuffling Works
Preparation: Spark identifies the need for shuffling and prepares a map task to read data from each partition.
Shuffle Write: Data is written to disk (or memory) as intermediate files. Each task writes data for each target partition separately.
Shuffle Read: During the reduce phase, Spark reads the intermediate files and groups the data by key.
Performance Considerations Shuffling can be a performance bottleneck because it involves disk I/O, network I/O, and data serialization/deserialization. Here are some tips to optimize performance:
Avoid Unnecessary Shuffling: Minimize operations that require shuffling, such as excessive grouping, joining, and repartitioning.
Optimize Partitioning: Use partitioning strategies that reduce the need for shuffling. For example, partition data by key before performing groupBy operations.
Use Broadcast Variables: For small datasets, use broadcast variables to avoid shuffling by sending the entire dataset to all nodes.
Tune Spark Configuration: Adjust Spark configuration settings (e.g., spark.sql.shuffle.partitions) to optimize the number of partitions and memory usage.
Example Code
Here’s an example of a shuffling operation during a groupBy operation in PySpark:
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("ShufflingExample").getOrCreate()
# Sample data
data = [("Alice", 1), ("Bob", 2), ("Alice", 3), ("Bob", 4)] df = spark.createDataFrame(data, ["name", "value"])
# GroupBy operation that triggers shuffling grouped_df = df.groupBy("name").sum("value") grouped_df.show()
In this example, the groupBy operation requires shuffling to group all records with the same name together.
Shuffling is an essential part of distributed data processing in Spark, but managing it effectively is key to optimizing performance and ensuring efficient data processing.
Shuffling occurs during the following operations:
GroupBy: When grouping data by a key, data needs to be shuffled to ensure that all records with the same key end up in the same partition.
Join: When joining two datasets, Spark may need to shuffle data to align matching keys in the same partition.
ReduceByKey: When aggregating data by a key, shuffling ensures that all records with the same key are processed together.
Coalesce and Repartition: When changing the number of partitions, Spark may need to shuffle data to balance the data across the new partitions.
How Shuffling Works
Preparation: Spark identifies the need for shuffling and prepares a map task to read data from each partition.
Shuffle Write: Data is written to disk (or memory) as intermediate files. Each task writes data for each target partition separately.
Shuffle Read: During the reduce phase, Spark reads the intermediate files and groups the data by key.
Performance Considerations Shuffling can be a performance bottleneck because it involves disk I/O, network I/O, and data serialization/deserialization. Here are some tips to optimize performance:
Avoid Unnecessary Shuffling: Minimize operations that require shuffling, such as excessive grouping, joining, and repartitioning.
Optimize Partitioning: Use partitioning strategies that reduce the need for shuffling. For example, partition data by key before performing groupBy operations.
Use Broadcast Variables: For small datasets, use broadcast variables to avoid shuffling by sending the entire dataset to all nodes.
Tune Spark Configuration: Adjust Spark configuration settings (e.g., spark.sql.shuffle.partitions) to optimize the number of partitions and memory usage.
Example Code
Here’s an example of a shuffling operation during a groupBy operation in PySpark:
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("ShufflingExample").getOrCreate()
# Sample data
data = [("Alice", 1), ("Bob", 2), ("Alice", 3), ("Bob", 4)] df = spark.createDataFrame(data, ["name", "value"])
# GroupBy operation that triggers shuffling grouped_df = df.groupBy("name").sum("value") grouped_df.show()
In this example, the groupBy operation requires shuffling to group all records with the same name together.
Shuffling is an essential part of distributed data processing in Spark, but managing it effectively is key to optimizing performance and ensuring efficient data processing.
Consideration for efficient and accurate data loading using PySpark
When loading data using PySpark, there are several important considerations to keep in mind to ensure efficient and accurate data processing. Here’s a checklist to guide you:
1. Understand the Data Source Data Format: Determine the format of the data (e.g., CSV, JSON, Parquet, Avro). Each format has its own advantages and trade-offs.
Data Size: Estimate the size of the data to optimize memory and compute resources.
Schema: Define the schema (structure) of the data to ensure consistency and avoid data type mismatches.
2. Optimize Data Ingestion Partitions: Use partitioning to divide the data into smaller, manageable chunks, which can be processed in parallel.
Sampling: For large datasets, consider sampling a subset of the data for initial processing and validation.
Compression: Use appropriate compression techniques (e.g., gzip, snappy) to reduce the size of the data and speed up processing.
3. Configuration and Resources Cluster Configuration: Ensure that the Spark cluster is properly configured with the right amount of memory and compute resources.
Resource Allocation: Set appropriate executor memory, cores, and driver memory settings to optimize resource utilization.
Broadcast Variables: Use broadcast variables for small datasets that need to be shared across all nodes.
4. Data Cleaning and Transformation Data Validation: Validate data for completeness, correctness, and consistency before processing.
Data Cleaning: Handle missing values, duplicates, and outliers to ensure data quality.
Schema Evolution: Manage schema changes over time to accommodate new data fields.
5. Performance Optimization Caching: Cache intermediate data frames to speed up iterative computations.
Join Optimization: Optimize join operations by selecting the appropriate join strategy (e.g., broadcast join for small tables).
Column Pruning: Select only the necessary columns to reduce the amount of data processed.
6. Error Handling and Logging Error Handling: Implement robust error handling to manage exceptions and failures during data loading.
Logging: Use logging to capture detailed information about the data loading process for debugging and monitoring.
7. Monitoring and Metrics Metrics Collection: Collect metrics on data ingestion performance, such as throughput, latency, and resource utilization.
Monitoring Tools: Use monitoring tools (e.g., Spark UI, Ganglia) to track the performance and health of the Spark cluster.
Example Code
Here's a basic example of loading a CSV file using PySpark:
from pyspark.sql import SparkSession
# Initialize Spark
spark = SparkSession.builder \ .appName("DataLoadingExample") \ .getOrCreate()
# Load CSV file df = spark.read.csv("/path/to/data.csv", header=True, inferSchema=True) # Show schema and data
df.printSchema()
df.show()
By considering these factors and following best practices, you can ensure efficient and reliable data loading using PySpark.
1. Understand the Data Source Data Format: Determine the format of the data (e.g., CSV, JSON, Parquet, Avro). Each format has its own advantages and trade-offs.
Data Size: Estimate the size of the data to optimize memory and compute resources.
Schema: Define the schema (structure) of the data to ensure consistency and avoid data type mismatches.
2. Optimize Data Ingestion Partitions: Use partitioning to divide the data into smaller, manageable chunks, which can be processed in parallel.
Sampling: For large datasets, consider sampling a subset of the data for initial processing and validation.
Compression: Use appropriate compression techniques (e.g., gzip, snappy) to reduce the size of the data and speed up processing.
3. Configuration and Resources Cluster Configuration: Ensure that the Spark cluster is properly configured with the right amount of memory and compute resources.
Resource Allocation: Set appropriate executor memory, cores, and driver memory settings to optimize resource utilization.
Broadcast Variables: Use broadcast variables for small datasets that need to be shared across all nodes.
4. Data Cleaning and Transformation Data Validation: Validate data for completeness, correctness, and consistency before processing.
Data Cleaning: Handle missing values, duplicates, and outliers to ensure data quality.
Schema Evolution: Manage schema changes over time to accommodate new data fields.
5. Performance Optimization Caching: Cache intermediate data frames to speed up iterative computations.
Join Optimization: Optimize join operations by selecting the appropriate join strategy (e.g., broadcast join for small tables).
Column Pruning: Select only the necessary columns to reduce the amount of data processed.
6. Error Handling and Logging Error Handling: Implement robust error handling to manage exceptions and failures during data loading.
Logging: Use logging to capture detailed information about the data loading process for debugging and monitoring.
7. Monitoring and Metrics Metrics Collection: Collect metrics on data ingestion performance, such as throughput, latency, and resource utilization.
Monitoring Tools: Use monitoring tools (e.g., Spark UI, Ganglia) to track the performance and health of the Spark cluster.
Example Code
Here's a basic example of loading a CSV file using PySpark:
from pyspark.sql import SparkSession
# Initialize Spark
spark = SparkSession.builder \ .appName("DataLoadingExample") \ .getOrCreate()
# Load CSV file df = spark.read.csv("/path/to/data.csv", header=True, inferSchema=True) # Show schema and data
df.printSchema()
df.show()
By considering these factors and following best practices, you can ensure efficient and reliable data loading using PySpark.
Unity Catalog Effectiveness and best practices
1. Organize Data Effectively
Catalogs: Use catalogs to group related datasets. For example, you might have separate catalogs for different departments or business units.
Schemas: Within catalogs, use schemas to further organize data. For instance, you could have schemas for different data types or use cases (e.g., raw, processed, analytics).
2. Implement Fine-Grained Access Control Roles and Permissions: Define roles and assign appropriate permissions to ensure that users only have access to the data they need.
Row-Level Security: Use row-level security to restrict access to specific rows within a table based on user roles.
3. Enable Data Lineage Capture Lineage: Ensure that data lineage is captured automatically to track the flow of data from its source to its final destination. This helps in auditing and troubleshooting.
Visualize Lineage: Use tools to visualize data lineage, making it easier to understand how data transforms and flows through your system.
4. Maintain Compliance and Auditability Audit Logs: Enable auditing to capture detailed logs of data access and changes. This is crucial for compliance with data regulations. Compliance Tags: Use compliance tags to mark sensitive data and apply appropriate access controls.
5. Optimize Performance Partitioning: Use partitioning to optimize query performance. Partition data based on commonly queried attributes.
Caching: Implement caching strategies to improve query performance for frequently accessed data.
6. Ensure Data Quality Data Validation: Implement data validation checks to ensure that incoming data meets quality standards.
Automated Testing: Use automated testing frameworks to validate data transformations and ensure data integrity.
7. Documentation and Data Discovery Metadata Documentation: Document metadata for all datasets, including descriptions, data types, and relationships. This makes it easier for users to understand and use the data.
Tags and Labels: Use tags and labels to categorize and describe data assets, making them easily discoverable.
8. Monitor and Manage Usage Usage Metrics: Track usage metrics to understand how data is being accessed and used. This can help identify popular datasets and potential performance bottlenecks.
Resource Management: Manage resources effectively to ensure that data processing tasks are optimized for performance and cost.
9. Security Best Practices Encryption: Encrypt data at rest and in transit to protect sensitive information.
Access Reviews: Regularly review access permissions to ensure that they are up-to-date and aligned with business requirements.
10. Training and Support User Training: Provide training to users on how to use Unity Catalog effectively. This includes understanding data organization, access controls, and data discovery tools.
Support Infrastructure: Set up a support infrastructure to address user queries and issues related to Unity Catalog.
By following these best practices, you can ensure that Unity Catalog is used effectively to manage and govern your data assets, leading to better data quality, compliance, and performance.
Schemas: Within catalogs, use schemas to further organize data. For instance, you could have schemas for different data types or use cases (e.g., raw, processed, analytics).
2. Implement Fine-Grained Access Control Roles and Permissions: Define roles and assign appropriate permissions to ensure that users only have access to the data they need.
Row-Level Security: Use row-level security to restrict access to specific rows within a table based on user roles.
3. Enable Data Lineage Capture Lineage: Ensure that data lineage is captured automatically to track the flow of data from its source to its final destination. This helps in auditing and troubleshooting.
Visualize Lineage: Use tools to visualize data lineage, making it easier to understand how data transforms and flows through your system.
4. Maintain Compliance and Auditability Audit Logs: Enable auditing to capture detailed logs of data access and changes. This is crucial for compliance with data regulations. Compliance Tags: Use compliance tags to mark sensitive data and apply appropriate access controls.
5. Optimize Performance Partitioning: Use partitioning to optimize query performance. Partition data based on commonly queried attributes.
Caching: Implement caching strategies to improve query performance for frequently accessed data.
6. Ensure Data Quality Data Validation: Implement data validation checks to ensure that incoming data meets quality standards.
Automated Testing: Use automated testing frameworks to validate data transformations and ensure data integrity.
7. Documentation and Data Discovery Metadata Documentation: Document metadata for all datasets, including descriptions, data types, and relationships. This makes it easier for users to understand and use the data.
Tags and Labels: Use tags and labels to categorize and describe data assets, making them easily discoverable.
8. Monitor and Manage Usage Usage Metrics: Track usage metrics to understand how data is being accessed and used. This can help identify popular datasets and potential performance bottlenecks.
Resource Management: Manage resources effectively to ensure that data processing tasks are optimized for performance and cost.
9. Security Best Practices Encryption: Encrypt data at rest and in transit to protect sensitive information.
Access Reviews: Regularly review access permissions to ensure that they are up-to-date and aligned with business requirements.
10. Training and Support User Training: Provide training to users on how to use Unity Catalog effectively. This includes understanding data organization, access controls, and data discovery tools.
Support Infrastructure: Set up a support infrastructure to address user queries and issues related to Unity Catalog.
By following these best practices, you can ensure that Unity Catalog is used effectively to manage and govern your data assets, leading to better data quality, compliance, and performance.
When and how to use Unity Catalog
When to Use Unity Catalog
Data Governance: When you need to enforce data governance policies across multiple workspaces and cloud platforms.
Compliance: When you need to maintain compliance with data regulations by tracking data access and usage.
Collaboration: When multiple teams need to collaborate on data projects and require a unified view of data assets.
Data Discovery: When you need to enable data consumers to easily find and access the data they need.
How to Use Unity Catalog
Set Up Unity Catalog: Attach a Unity Catalog metastore to your Databricks workspace. This metastore will register metadata about your data and AI assets.
Organize Data Assets: Use catalogs, schemas, and tables to organize your data assets. Catalogs often mirror organizational units or software development lifecycle scopes.
Define Access Controls: Set up fine-grained access controls using Unity Catalog's security model, which is based on standard ANSI SQL.
Enable Data Lineage and Auditing: Configure Unity Catalog to capture lineage data and user-level audit logs.
Tag and Document Data Assets: Use Unity Catalog's tagging and documentation features to make data assets easily discoverable.
Integrate with Other Tools: Leverage Unity Catalog's interoperability to integrate with various data and AI platforms, ensuring seamless data management2.
By using Unity Catalog, you can centralize data governance, ensure compliance, and enable efficient collaboration across your data and AI projects
Data Governance: When you need to enforce data governance policies across multiple workspaces and cloud platforms.
Compliance: When you need to maintain compliance with data regulations by tracking data access and usage.
Collaboration: When multiple teams need to collaborate on data projects and require a unified view of data assets.
Data Discovery: When you need to enable data consumers to easily find and access the data they need.
How to Use Unity Catalog
Set Up Unity Catalog: Attach a Unity Catalog metastore to your Databricks workspace. This metastore will register metadata about your data and AI assets.
Organize Data Assets: Use catalogs, schemas, and tables to organize your data assets. Catalogs often mirror organizational units or software development lifecycle scopes.
Define Access Controls: Set up fine-grained access controls using Unity Catalog's security model, which is based on standard ANSI SQL.
Enable Data Lineage and Auditing: Configure Unity Catalog to capture lineage data and user-level audit logs.
Tag and Document Data Assets: Use Unity Catalog's tagging and documentation features to make data assets easily discoverable.
Integrate with Other Tools: Leverage Unity Catalog's interoperability to integrate with various data and AI platforms, ensuring seamless data management2.
By using Unity Catalog, you can centralize data governance, ensure compliance, and enable efficient collaboration across your data and AI projects
Unity Catalog
Unity Catalog is a unified governance solution for data and AI assets on Databricks. Here are some of its key features and how it can be used in projects:
Key Features of Unity Catalog
Centralized Metadata Management: Unity Catalog provides a centralized repository for managing metadata, making it easier to organize and manage data assets.
Fine-Grained Access Control: It allows organizations to enforce fine-grained access controls on their data assets, ensuring that only authorized users can access sensitive data.
Data Lineage Tracking: Unity Catalog automatically captures lineage data, tracking how data assets are created and used across different languages and tools.
Built-in Auditing: It captures user-level audit logs that record access to data, helping organizations maintain compliance with data regulations.
Data Discovery: Unity Catalog lets you tag and document data assets, providing a search interface to help data consumers find the data they need.
Cross-Cloud Data Governance: It supports governance across multiple cloud platforms, ensuring consistent data policies and access controls.
Interoperability: Unity Catalog supports various data formats and integrates with multiple data and AI platforms, promoting interoperability and flexibility
Key Features of Unity Catalog
Centralized Metadata Management: Unity Catalog provides a centralized repository for managing metadata, making it easier to organize and manage data assets.
Fine-Grained Access Control: It allows organizations to enforce fine-grained access controls on their data assets, ensuring that only authorized users can access sensitive data.
Data Lineage Tracking: Unity Catalog automatically captures lineage data, tracking how data assets are created and used across different languages and tools.
Built-in Auditing: It captures user-level audit logs that record access to data, helping organizations maintain compliance with data regulations.
Data Discovery: Unity Catalog lets you tag and document data assets, providing a search interface to help data consumers find the data they need.
Cross-Cloud Data Governance: It supports governance across multiple cloud platforms, ensuring consistent data policies and access controls.
Interoperability: Unity Catalog supports various data formats and integrates with multiple data and AI platforms, promoting interoperability and flexibility
Wednesday, February 19, 2025
Why & when to use Auto Loader
Auto Loader in Databricks is primarily designed for processing new files as they arrive inCloud Storage, making it an excellent choice for handling data sources that generate log files and store them in cloud storage (e.g., S3, ADLS, Google Cloud Storage). However, it is not designed to directly handle streaming live data from sources like log file generators that continuously produce data without storing it in files.
When you specify the format as CloudFiles in your readStream operation, it indicates that you're using Databricks Auto Loader. Auto Loader automatically processes new data files as they arrive in your cloud storage and handles schema evolution and checkpointing to ensure reliable and scalable ingestion of streaming data.
When to Use Auto Loader: Cloud Storage: When log files are being generated and stored in cloud storage, Auto Loader can efficiently process these files as they arrive. Batch Processing: Auto Loader can be used for near real-time batch processing of log files, ensuring that new files are ingested and processed as they appear in the cloud storage.
Alternatives for Streaming Live Data: For processing streaming live data directly from sources like log file generators, you can use Spark Structured Streaming with different input sources:
Kafka:
Apache Kafka is a popular choice for ingesting and processing streaming data. You can set up Kafka to receive log data from log file generators and then use Spark Structured Streaming to process the data in real-time.
streaming_df = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", "host:port") \ .option("subscribe", "topic") \ .load()
# Process the streaming data
transformed_df = streaming_df.selectExpr("CAST(value AS STRING) as message")
# Write the streaming DataFrame to a Delta table
query = transformed_df.writeStream.format("delta") \ .outputMode("append") \ .option("checkpointLocation", "/path/to/checkpoint/dir") \ .start("/path/to/delta/table")
query.awaitTermination()
Socket Source:
If you have a simple use case where data is sent over a network socket, you can use the socket source in Spark Structured Streaming.
streaming_df = spark.readStream.format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load()
# Process the streaming data
transformed_df = streaming_df.selectExpr("CAST(value AS STRING) as message")
# Write the streaming DataFrame to a Delta table query = transformed_df.writeStream.format("delta") \ .outputMode("append") \ .option("checkpointLocation", "/path/to/checkpoint/dir") \ .start("/path/to/delta/table")
query.awaitTermination()
File Source with Auto Loader:
If your log file generator stores the logs in cloud storage, you can use Auto Loader to process them.
streaming_df = spark.readStream.format("cloudFiles") \ .option("cloudFiles.format", "json") \ .option("cloudFiles.schemaLocation", "/path/to/checkpoint/schema") \ .load("s3://your-bucket/path/to/logs")
# Process the streaming data
transformed_df = streaming_df.select("timestamp", "log_level", "message")
# Write the streaming DataFrame to a Delta table
query = transformed_df.writeStream.format("delta") \ .outputMode("append") \ .option("checkpointLocation", "/path/to/checkpoint/dir") \ .start("/path/to/delta/table")
query.awaitTermination()
By choosing the appropriate method based on your data source and requirements, you can effectively handle and process streaming live data or log files using Spark Structured Streaming and Databricks.
When you specify the format as CloudFiles in your readStream operation, it indicates that you're using Databricks Auto Loader. Auto Loader automatically processes new data files as they arrive in your cloud storage and handles schema evolution and checkpointing to ensure reliable and scalable ingestion of streaming data.
When to Use Auto Loader: Cloud Storage: When log files are being generated and stored in cloud storage, Auto Loader can efficiently process these files as they arrive. Batch Processing: Auto Loader can be used for near real-time batch processing of log files, ensuring that new files are ingested and processed as they appear in the cloud storage.
Alternatives for Streaming Live Data: For processing streaming live data directly from sources like log file generators, you can use Spark Structured Streaming with different input sources:
Kafka:
Apache Kafka is a popular choice for ingesting and processing streaming data. You can set up Kafka to receive log data from log file generators and then use Spark Structured Streaming to process the data in real-time.
streaming_df = spark.readStream.format("kafka") \ .option("kafka.bootstrap.servers", "host:port") \ .option("subscribe", "topic") \ .load()
# Process the streaming data
transformed_df = streaming_df.selectExpr("CAST(value AS STRING) as message")
# Write the streaming DataFrame to a Delta table
query = transformed_df.writeStream.format("delta") \ .outputMode("append") \ .option("checkpointLocation", "/path/to/checkpoint/dir") \ .start("/path/to/delta/table")
query.awaitTermination()
Socket Source:
If you have a simple use case where data is sent over a network socket, you can use the socket source in Spark Structured Streaming.
streaming_df = spark.readStream.format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load()
# Process the streaming data
transformed_df = streaming_df.selectExpr("CAST(value AS STRING) as message")
# Write the streaming DataFrame to a Delta table query = transformed_df.writeStream.format("delta") \ .outputMode("append") \ .option("checkpointLocation", "/path/to/checkpoint/dir") \ .start("/path/to/delta/table")
query.awaitTermination()
File Source with Auto Loader:
If your log file generator stores the logs in cloud storage, you can use Auto Loader to process them.
streaming_df = spark.readStream.format("cloudFiles") \ .option("cloudFiles.format", "json") \ .option("cloudFiles.schemaLocation", "/path/to/checkpoint/schema") \ .load("s3://your-bucket/path/to/logs")
# Process the streaming data
transformed_df = streaming_df.select("timestamp", "log_level", "message")
# Write the streaming DataFrame to a Delta table
query = transformed_df.writeStream.format("delta") \ .outputMode("append") \ .option("checkpointLocation", "/path/to/checkpoint/dir") \ .start("/path/to/delta/table")
query.awaitTermination()
By choosing the appropriate method based on your data source and requirements, you can effectively handle and process streaming live data or log files using Spark Structured Streaming and Databricks.
Subscribe to:
Posts (Atom)
Data synchronization in Lakehouse
Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...
-
Steps to Implement Medallion Architecture : Ingest Data into the Bronze Layer : Load raw data from external sources (e.g., databases, AP...
-
Databricks Platform Architecture The Databricks platform architecture consists of two main components: the Control Plane and the Data Pla...
-
from pyspark.sql import SparkSession from pyspark.sql.types import ArrayType, StructType from pyspark.sql.functions import col, explode_o...