Building an ML application using MLlib in Pyspark

This tutorial will guide you on how to create ML models in apache spark and how to interact with them

Yashwanth Madaka
Spark Stack
Stroke Dataset
The architecture diagram of our project
Google Cloud Console – Dataproc
GC — Creating a cluster-1
GC — Creating a cluster-2
GC — Creating a cluster-3
Google cloud — Dataproc clusters
sudo nano ~/.jupyter_notebook_config.py
c=get_config()c.NotebookApp.ip=’*’c.NotebookApp.open_browser=Falsec.NotebookApp.port=5000
jupyter-notebook --no-browser — port=5000
sudo -i
conda install -c glemaitre imbalanced-learn
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer, VectorIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql.functions import avg
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from imblearn.over_sampling import SMOTE
from imblearn.combine import SMOTEENN
from sklearn.model_selection import train_test_split
from collections import Counter
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext(‘local’)
spark = SparkSession(sc)
input_dir = ‘gs://data-stroke-1/’
df = spark.read.format(‘com.databricks.spark.csv’).options(header=’true’, inferschema=’true’).load(input_dir+’stroke.csv’)
df.columns
featureColumns = [‘gender’,’age’,‘diabetes’,‘hypertension’,
‘heart disease’,‘smoking history’,‘BMI’]
df = df.filter(df.age >2)
df.count()
responses = df.groupBy(‘stroke’).count().collect()
categories = [i[0] for i in responses]
counts = [i[1] for i in responses]

ind = np.array(range(len(categories)))
width = 0.35
plt.bar(ind, counts, width=width, color=’r’)

plt.ylabel(‘counts’)
plt.title(‘Stroke’)
plt.xticks(ind + width/2., categories)

taken from — https://dqydj.com/bmi-distribution-by-age-calculator-for-the-united-states/
imputeDF = dfimputeDF_Pandas = imputeDF.toPandas()
df_2_9 = imputeDF_Pandas[(imputeDF_Pandas[‘age’] >=2 ) & (imputeDF_Pandas[‘age’] <= 9)]
values = {‘smoking history’: 0, ‘BMI’:17.125}
df_2_9 = df_2_9.fillna(value = values)
df_10_13 = imputeDF_Pandas[(imputeDF_Pandas[‘age’] >=10 ) & (imputeDF_Pandas[‘age’] <= 13)]
values = {‘smoking history’: 0, ‘BMI’:19.5}
df_10_13 = df_10_13.fillna(value = values)
df_14_17 = imputeDF_Pandas[(imputeDF_Pandas[‘age’] >=14 ) & (imputeDF_Pandas[‘age’] <= 17)]
values = {‘smoking history’: 0, ‘BMI’:23.05}
df_14_17 = df_14_17.fillna(value = values)
df_18_24 = imputeDF_Pandas[(imputeDF_Pandas[‘age’] >=18 ) & (imputeDF_Pandas[‘age’] <= 24)]
values = {‘smoking history’: 0, ‘BMI’:27.1}
df_18_24 = df_18_24.fillna(value = values)
df_25_29 = imputeDF_Pandas[(imputeDF_Pandas[‘age’] >=25 ) & (imputeDF_Pandas[‘age’] <= 29)]
values = {‘smoking history’: 0, ‘BMI’:27.9}
df_25_29 = df_25_29.fillna(value = values)
df_30_34 = imputeDF_Pandas[(imputeDF_Pandas[‘age’] >=30 ) & (imputeDF_Pandas[‘age’] <= 34)]
values = {‘smoking history’: 0.25, ‘BMI’:29.6}
df_30_34 = df_30_34.fillna(value = values)
df_35_44 = imputeDF_Pandas[(imputeDF_Pandas[‘age’] >=35 ) & (imputeDF_Pandas[‘age’] <= 44)]
values = {‘smoking history’: 0.25, ‘BMI’:30.15}
df_35_44 = df_35_44.fillna(value = values)
df_45_49 = imputeDF_Pandas[(imputeDF_Pandas[‘age’] >=45 ) & (imputeDF_Pandas[‘age’] <= 49)]
values = {‘smoking history’: 0, ‘BMI’:29.7}
df_45_49 = df_45_49.fillna(value = values)
df_50_59 = imputeDF_Pandas[(imputeDF_Pandas[‘age’] >=50 ) & (imputeDF_Pandas[‘age’] <= 59)]
values = {‘smoking history’: 0, ‘BMI’:29.95}
df_50_59 = df_50_59.fillna(value = values)
df_60_74 = imputeDF_Pandas[(imputeDF_Pandas[‘age’] >=60 ) & (imputeDF_Pandas[‘age’] <= 74)]
values = {‘smoking history’: 0, ‘BMI’:30.1}
df_60_74 = df_60_74.fillna(value = values)
df_75_plus = imputeDF_Pandas[(imputeDF_Pandas[‘age’] >75 )]
values = {‘smoking history’: 0, ‘BMI’:28.1}
df_75_plus = df_75_plus.fillna(value = values)
all_frames = [df_2_9, df_10_13, df_14_17, df_18_24, df_25_29, df_30_34, df_35_44, df_45_49, df_50_59, df_60_74, df_75_plus]
df_combined = pd.concat(all_frames)
df_combined_converted = spark.createDataFrame(df_combined)
imputeDF = df_combined_converted
X = imputeDF.toPandas().filter(items=[‘gender’, ‘age’, ‘diabetes’,’hypertension’,’heart disease’,’smoking history’,’BMI’])
Y = imputeDF.toPandas().filter(items=[‘stroke’])
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.1, random_state=0)
sm = SMOTE(random_state=12, ratio = ‘auto’, kind = ‘regular’)x_train_res, y_train_res = sm.fit_sample(X_train, Y_train)print(‘Resampled dataset shape {}’.format(Counter(y_train_res)))
dataframe_1 = pd.DataFrame(x_train_res,columns=[‘gender’, ‘age’, ‘diabetes’, ‘hypertension’, ‘heart disease’, ‘smoking history’, ‘BMI’])
dataframe_2 = pd.DataFrame(y_train_res, columns = [‘stroke’])
# frames = [dataframe_1, dataframe_2]
result = dataframe_1.combine_first(dataframe_2)
imputeDF_1 = spark.createDataFrame(result)
Spark ML Pipeline
binarizer = Binarizer(threshold=0.0, inputCol=”stroke”, outputCol=”label”)
binarizedDF = binarizer.transform(imputeDF_1)
binarizedDF = binarizedDF.drop(‘stroke’)
assembler = VectorAssembler(inputCols = featureColumns, outputCol = “features”)
assembled = assembler.transform(binarizedDF)
print(assembled)
(trainingData, testData) = assembled.randomSplit([0.7, 0.3], seed=0)
print(“Distribution of Ones and Zeros in trainingData is: “, trainingData.groupBy(“label”).count().take(3))
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=25, minInstancesPerNode=30, impurity="gini")
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(trainingData)
predictions = model.transform(testData)
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
results = predictions.select(['probability', 'label'])

## prepare score-label set
results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]
scoreAndLabels = sc.parallelize(results_list)

metrics = metric(scoreAndLabels)
print("Test Data Aread under ROC score is : ", metrics.areaUnderROC)

from sklearn.metrics import roc_curve, auc

fpr = dict()
tpr = dict()
roc_auc = dict()

y_test = [i[1] for i in results_list]
y_score = [i[0] for i in results_list]

fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)

%matplotlib inline
plt.figure()
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic Graph')
plt.legend(loc="lower right")
plt.show()

AUC — ROC Curve
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
pyspark --jars /home/yashwanthmadaka_imp24/jpmml-sparkml-executable-1.5.3.jar
trainingData = trainingData.drop(“features”)from pyspark.ml.feature import RFormula
formula = RFormula(formula = "label ~ .")
classifier = DecisionTreeClassifier(maxDepth=25, minInstancesPerNode=30, impurity="gini")
pipeline = Pipeline(stages = [formula, classifier])
pipelineModel = pipeline.fit(trainingData)
from pyspark2pmml import PMMLBuilder
pmmlBuilder = PMMLBuilder(sc, trainingData, pipelineModel)
.putOption(classifier, "compact", True)
pmmlBuilder.buildFile("dt-stroke.pmml")
cd openscoring-server/targetjava -jar openscoring-server-executable-2.0-SNAPSHOT.jar
cd openscoring-client/targetjava -cp openscoring-client-executable-2.0-SNAPSHOT.jar org.openscoring.client.Deployer --model http://localhost:8080/openscoring/model/stroke --file dt-stroke.pmml
npm install
npm start
ReactJS Frontend