from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import os
import time
import glob
import argparse
# Create SparkSession
spark = SparkSession.builder.appName("Employee Graph").getOrCreate()
def read_csv_file(file_path):
# Read CSV file into DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)
# Register DataFrame as temporary view
df.createOrReplaceTempView("employees")
# Count employees by department
department_counts = spark.sql("SELECT department, COUNT(*) as count FROM employees GROUP BY department").toPandas()
return department_counts
def create_bar_chart(department_counts):
plt.bar(department_counts['department'], department_counts['count'])
plt.xlabel('Department')
plt.ylabel('Number of Employees')
plt.title('Employee Distribution by Department')
plt.show()
def create_pie_chart(department_counts):
plt.pie(department_counts['count'], labels=department_counts['department'], autopct='%1.1f%%')
plt.title('Employee Distribution by Department')
plt.show()
def add_employee_to_db(df):
# Write DataFrame to SQLite database
df.write.format("jdbc").option("url", "jdbc:sqlite:employees.db").option("driver", "org.sqlite.JDBC").option("dbtable", "employees").save()
def main():
parser = argparse.ArgumentParser(description='Employee Graph')
parser.add_argument('--chart', choices=['bar', 'pie'], help='Type of chart to create')
args = parser.parse_args()
while True:
csv_files = glob.glob('*.csv')
if len(csv_files) > 0:
department_counts = read_csv_file(csv_files[-1])
if args.chart == 'bar':
create_bar_chart(department_counts)
elif args.chart == 'pie':
create_pie_chart(department_counts)
# Add employees to SQLite database
df = spark.read.csv(csv_files[-1], header=True, inferSchema=True)
add_employee_to_db(df)
time.sleep(60)
if __name__ == "__main__":
main()
Note that this code uses the pyspark.sql module to read and manipulate the CSV data, and the matplotlib library to create the charts. The add_employee_to_db function writes the DataFrame to a SQLite database using the jdbc format.
Subscribe to:
Post Comments (Atom)
Data synchronization in Lakehouse
Data synchronization in Lakebase ensures that transactional data and analytical data remain up-to-date across the lakehouse and Postgres d...
-
Steps to Implement Medallion Architecture : Ingest Data into the Bronze Layer : Load raw data from external sources (e.g., databases, AP...
-
from pyspark.sql import SparkSession from pyspark.sql.types import ArrayType, StructType from pyspark.sql.functions import col, explode_o...
-
Databricks Platform Architecture The Databricks platform architecture consists of two main components: the Control Plane and the Data Pla...
No comments:
Post a Comment