Thursday, March 6, 2025

Read csv file and plot a graph

from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import os
import time
import glob
import argparse

# Create SparkSession
spark = SparkSession.builder.appName("Employee Graph").getOrCreate()
def read_csv_file(file_path):

# Read CSV file into DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Register DataFrame as temporary view
df.createOrReplaceTempView("employees")
# Count employees by department

department_counts = spark.sql("SELECT department, COUNT(*) as count FROM employees GROUP BY department").toPandas()
return department_counts

def create_bar_chart(department_counts):
plt.bar(department_counts['department'], department_counts['count'])
plt.xlabel('Department')
plt.ylabel('Number of Employees')
plt.title('Employee Distribution by Department')
plt.show()

def create_pie_chart(department_counts):
plt.pie(department_counts['count'], labels=department_counts['department'], autopct='%1.1f%%')
plt.title('Employee Distribution by Department')
plt.show()

def add_employee_to_db(df):
# Write DataFrame to SQLite database
df.write.format("jdbc").option("url", "jdbc:sqlite:employees.db").option("driver", "org.sqlite.JDBC").option("dbtable", "employees").save()

def main():
parser = argparse.ArgumentParser(description='Employee Graph')
parser.add_argument('--chart', choices=['bar', 'pie'], help='Type of chart to create')
args = parser.parse_args()

while True:
csv_files = glob.glob('*.csv')
if len(csv_files) > 0:
department_counts = read_csv_file(csv_files[-1])
if args.chart == 'bar':
create_bar_chart(department_counts)
elif args.chart == 'pie':
create_pie_chart(department_counts)

# Add employees to SQLite database
df = spark.read.csv(csv_files[-1], header=True, inferSchema=True)
add_employee_to_db(df)
time.sleep(60)

if __name__ == "__main__":

main()
Note that this code uses the pyspark.sql module to read and manipulate the CSV data, and the matplotlib library to create the charts. The add_employee_to_db function writes the DataFrame to a SQLite database using the jdbc format.

No comments:

Post a Comment

Data synchronization in Lakehouse

Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...