Building a Data Engineering project with AWS EC2, Parameter Store, Snowflake, and Streamlit
I wanted to do a small project which was a mix of Data Engineering and Data Science. I took a dataset from Kaggle, which contained data on the bio-signals of the distinct patients and whether they smoke.
There are Multiple parts to this project
1. Data Ingestion of the CSV file into a Snowflake table
For this I have used the following:
- Parameter Store of AWS System Manager to store snowflake credentials
- A python script(parameters.py) for Storing, Retrieving snowflake credentials.
- One more python script(first_load.py) ingests data into the snowflake table.
Code for getting the parameters:
def get_parameters(paramter_name: str)-> str:
"""
This function is used to get the parameters from AWS Parameter Store.
It takes the parameter name as input and returns the parameter value.
It uses the boto3 client to connect to AWS Parameter Store.
It uses the get_parameter method to get the parameter value.
It returns the parameter value.
"""
try:
session = boto3.session.Session(region_name='us-west-2')
# Create a boto3 client for AWS Systems Manager Parameter Store
ssm = session.client("ssm")
# Retrieve the parameter from Parameter Store
response = ssm.get_parameter(
Name=paramter_name,
WithDecryption=False
)
# Extract the value of the parameter
param_value = response["Parameter"]["Value"]
return param_value
except Exception as e:
logger.error(e)
raise e
The above script gets the user_name, password, and account_number identifier of snowflake from the parameter store.
Code for ingesting the data into snowflake:
import pandas as pd
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
import parameters
def ingest_data():
"""
This function is used to ingest data into snowflake.
It uses the following command line arguments:
1. user: Snowflake user name
2. password: Snowflake password
3. account: Snowflake account name
4. database: Snowflake database name
5. schema: Snowflake schema name
6. role: Snowflake role name
7. warehouse: Snowflake warehouse name
8. table: Snowflake table name
"""
try:
engine = create_engine(URL(user = parameters.get_parameters('user_name'),\
password = parameters.get_parameters('password'),\
account = parameters.get_parameters('account_number'),\
database = args.database,\
schema=args.schema,\
role= args.role,\
warehouse = args.warehouse))
conn = engine.connect()
one_row = conn.execute("SELECT current_version()").fetchone()
df_chunks = pd.read_csv('./data/train_dataset.csv',chunksize=1000)
for i,df_chunk in enumerate(df_chunks):
if i == 0:
df_chunk = df_chunk[['age', 'height(cm)', 'weight(kg)', 'waist(cm)', 'fasting blood sugar',
'Cholesterol', 'hemoglobin', 'Urine protein', 'serum creatinine',
'smoking']]
df_chunk.to_sql('smoking',con=conn,index=False,if_exists='replace')
else:
df_chunk = df_chunk[['age', 'height(cm)', 'weight(kg)', 'waist(cm)', 'fasting blood sugar',
'Cholesterol', 'hemoglobin', 'Urine protein', 'serum creatinine',
'smoking']]
df_chunk.to_sql(f'{args.table}',con=conn,index=False,if_exists='append')
conn.close()
engine.dispose()
except Exception as e:
logger.error(e)
raise e
This python function ingests the data into the snowflake table called ‘smoking’ and uses snowflake.sqlalchemy library to do that.
Training the Model
I have decided to use a Random forest classifier for the ML model. Honestly, the data I got from Kaggle was as clean. I was more interested in the Data engineering aspect of the project. So, I kept it simple for this part of the project. I got around 77% accuracy and that was good for me.
Code for training the model and saving it:
import pandas as pd
import os
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import joblib
def create_model() -> bool:
""" This function creates a Random Forest Classifier model and saves it as tree_model.joblib
Args:
None
Returns:
True: If the model is created and saved successfully
False: If the model is not created and saved successfully
"""
try:
# Loading the data
df_raw = pd.read_csv("./data/train_dataset.csv")
df_raw = df_raw[['age', 'height(cm)', 'weight(kg)', 'waist(cm)', 'fasting blood sugar',
'Cholesterol', 'hemoglobin', 'Urine protein', 'serum creatinine',
'smoking']]
#Loadng the Classifier
model = RandomForestClassifier(n_estimators=100)
# Trianing model
model.fit(df_raw.iloc[:,:-1],df_raw['smoking'])
# Saving the model
filename = './model/tree_model.joblib'
joblib.dump(model, filename)
return True
except Exception as e:
raise e
The above code trains a Random forest classifier model and saves it using joblib library
Deploying on AWS EC2 using streamlit:
I have created a web app on top of the trained model and deployed it in EC2. For those who are not familiar with streamlit, It’s an awesome library where we can create a web app with few lines of code and almost zero HTML and CSS.
Streamlit code:
import streamlit as st
import pandas as pd
import model
import inserting_data
import time
st.set_page_config(page_title="Smoking Prediction",
page_icon=":guardsman:")
# Adding custom CSS to change the look and feel of the Streamlit app
st.title("Smoking Status Detection")
st.subheader("An End-End Data Engineering Project by Siva Chandan")
df_main = pd.DataFrame()
with st.form("input_df"):
Age = st.number_input("Age",min_value = 21, max_value =98 , \
help = "Enter your age here(Patient have to be 21 to use this)")
height = st.number_input("Height(cm)", min_value = 54, max_value = 280, \
value = 165, help = "Enter the height in cm")
weight = st.number_input("Weight(kg)",min_value = 30.0, max_value =700.00, \
value = 65.00, step = 0.1, help = "Enter Weight in KG(Kilo Gram)")
waist = st.number_input("waist(cm)",min_value = 51.000000, max_value =129.000000, \
value = 82.000000, step = 0.1 , help = "")
fasting_blood_sugar = st.number_input("fasting blood sugar", value = 96.00,\
min_value=46.00, max_value=423.00, step=0.1, help = "Enter the Fasting Blood Sugar Values")
cholesterol = st.number_input("cholesterol",min_value = 55.00 , max_value =445.00, \
value = 195.000000 , step = 0.1, help = "Enter the Cholestrol value")
hemoglobin = st.number_input("hemoglobin",min_value = 4.90, max_value =21.1, \
value =14.8 , step = 0.1, help = "Enter hemoglobin levels")
urine_protein = st.number_input("Urine protein",min_value = 1, max_value =6, \
value = 1, step = 1, help = "Enter Urine Protein")
serum_creatinine = st.number_input("serum creatinine",min_value = 0.1, max_value =1.0, \
value = 0.9, step = 0.01, help = "Enter serum creatinine")
answer = st.radio("Add the record to Data base", ("Yes", "No"))
submit_function = st.form_submit_button("Submit")
if submit_function:
df = pd.DataFrame({
'age':[int(Age)], 'height(cm)':[int(height)], 'weight(kg)':[int(weight)], 'waist(cm)':[float(waist)], 'fasting blood sugar':[int(fasting_blood_sugar)],
'Cholesterol':[int(cholesterol)], 'hemoglobin':[int(hemoglobin)], 'Urine protein':[int(urine_protein)], 'serum creatinine':[float(serum_creatinine)]
})
st.dataframe(df)
smoking_status = model.load_predict(df)
st.markdown("<link rel='stylesheet' href='https://use.fontawesome.com/releases/v5.14.0/css/all.css' integrity='sha384-HzLeBuhoNPvSl5KYnjx0BT+WB0QEEqLprO+NBkkk5gbc67FTaL7XIGa2w1L0Xbgc' crossorigin='anonymous'>", unsafe_allow_html=True)
if smoking_status == 1:
# Include the Font Awesome CSS file
# Display the icon
st.markdown("<p><strong>The Patient is Smoking <i class='fas fa-smoking'></i></strong></p>", unsafe_allow_html=True)
elif smoking_status == 0:
# Display the icon
st.markdown("<p><strong>The Patient is Not Smoking <i class='fas fa-smoking-ban'></i></strong></p>", unsafe_allow_html=True)
if answer == 'Yes':
with st.spinner('Adding Data'):
st.dataframe(df)
inserting_data.inserting_data(df,'smoking','public','accountadmin','compute_wh')
st.success('Record Appended', icon="✅")
elif answer == "No":
with st.spinner('Reloading the webapp'):
time.sleep(1.5)
st.experimental_rerun()
As of now, the web app is live. You can visit it at this URL
Code for inserting a new row into the Snowflake table:
import pandas as pd
import snowflake.connector
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
import logging
import os
import parameters
def inserting_data(df,database:str, schema:str,
role:str, warehouse:str) -> bool:
"""
This function is used to ingest data into snowflake.
It uses the following command line arguments:
1. user: Snowflake user name
2. password: Snowflake password
3. account: Snowflake account name
4. database: Snowflake database name
5. schema: Snowflake schema name
6. role: Snowflake role name
7. warehouse: Snowflake warehouse name
"""
try:
engine = create_engine(URL(user = parameters.get_parameters('user_name'),\
password = parameters.get_parameters('password'),\
account = parameters.get_parameters('account_number'),\
database = database,\
schema= schema,\
role= role,\
warehouse = warehouse))
conn = engine.connect()
one_row = conn.execute("SELECT current_version()").fetchone()
df.to_sql('smoking',con=conn,index=False,if_exists='append')
conn.close()
engine.dispose()
return True
except Exception as e:
raise e
For more info on how to deploy this code and also understand it better please visit my Github repo.
I would like to thank you for taking the time to read my article. I hope that it has provided you with valuable insights and information. If you found my article to be informative and useful, I would greatly appreciate it if you could follow me, share it with your network, and give it a clap. Your support and engagement are essential to my continued growth as a writer and I am always grateful for the opportunity to connect with my readers. Thank you for your time and consideration.
Thank you.