ICP 14: Apache Spark MLIB - acikgozmehmet/BigDataProgramming GitHub Wiki

ICP 14: Apache Spark MLIB

Objectives

  1. Clustering
  2. Classification
  3. Regression
  4. Recommendation

Overview

MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. At a high level, it provides tools such as:

ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering

  • Featurization: feature extraction, transformation, dimensionality reduction, and selection
  • Pipelines: tools for constructing, evaluating, and tuning ML Pipelines
  • Persistence: saving and load algorithms, models, and Pipelines
  • Utilities: linear algebra, statistics, data handling, etc.

Spark MLlib is Apache Spark’s Machine Learning component. One of the major attractions of Spark is the ability to scale computation massively, and that is exactly what you need for machine learning algorithms. But the limitation is that all machine learning algorithms cannot be effectively parallelized. Each algorithm has its own challenges for parallelization, whether it is task parallelism or data parallelism.

Having said that, Spark is becoming the de-facto platform for building machine learning algorithms and applications. Well, you can check out the Spark course curriculum curated by Industry Experts before going ahead with the blog. The developers working on the Spark MLlib are implementing more and more machine algorithms in a scalable and concise manner in the Spark framework. Through this blog, we will learn the concepts of Machine Learning, Spark MLlib, its utilities, algorithms and a complete use case of Movie Recommendation System.

Installation Requirements

  1. Pyspark is used with 2.1.0.

In Class Programming

1. Classification:

Dataset: https://archive.ics.uci.edu/ml/datasets/Adult

We are using following algorithms against the adult dataset given above.

  • Naïve Bayes
  • Decision Tree
  • Random Forest

This is a classification problem. The column X in the dataset is for the people who make more or less than 50K. The label in the dataset is column X while the features (predictors) are age, education-num and hours-per-week.

Creating spark session, data loading and data manipulations are exactly same in these algorithms. That's why it is better to show how these steps are implemented in here only once.

    // Creating spark session
    spark = SparkSession.builder.appName("DecisionTree App").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

   // Loading the data
    data = spark.read.format("csv").option("header", True) \
                                   .option("inferSchema", True) \
                                   .option("delimiter", ",") \
                                   .load("D:\\UMKC\\__Spring2020\\CS5590BDP\Module-2\\Lesson-7\\MachineLearning\\data\\adult.data")


    data.printSchema()

     // creating categorical label value
     data = data.withColumn("X", F.when(F.col("X") == ' <=50K', 0).when(F.col("X") == ' >50K', 1))
     data = data.withColumnRenamed("X", "label") 
     data = data.select(data.label.cast("double"),"age", "education-num", "hours-per-week")
     data.show()

     assembler = VectorAssembler(inputCols=data.columns[1:], outputCol="features")
     data = assembler.transform(data)
     data.show()

     // Splitting the data into training and data set
     training, test = data.select("label","features").randomSplit([0.70, 0.30])

a. Naïve Bayes

Naive Bayes is a simple multiclass classification algorithm with the assumption of independence between every pair of features. Naive Bayes can be trained very efficiently. Within a single pass to the training data, it computes the conditional probability distribution of each feature given label, and then it applies Bayes’ theorem to compute the conditional probability distribution of label given an observation and use it for prediction.

NaiveBayes implements multinomial naive Bayes. It takes an RDD of LabeledPoint and an optional smoothing parameter lambda as input, an optional model type parameter (default is “multinomial”), and outputs a NaiveBayesModel, which can be used for evaluation and prediction.

Source code

     // Create Navie Bayes model and fit the model with training dataset
     nb = NaiveBayes()
     model = nb.fit(training)

     // Generate prediction from test dataset
     pred = model.transform(test)

     // Evaluate the accuracy of the model
     evaluator = MulticlassClassificationEvaluator()
     accuracy = evaluator.evaluate(pred)

     // Show model accuracy
     print("Accuracy:\n\n", accuracy)

    // Report
    predAndLabels = pred.select("prediction", "label").rdd
    metrics = MulticlassMetrics(predAndLabels)
    print("Confusion Matrix", metrics.confusionMatrix())
    print("Precision", metrics.precision())
    print("Recall", metrics.recall())
    print("F-measure", metrics.fMeasure())

b. Decision Tree

Decision trees are widely used since they are easy to interpret, handle categorical features, extend to the multiclass classification setting, do not require feature scaling, and are able to capture non-linearities and feature interactions. The decision tree is a greedy algorithm that performs a recursive binary partitioning of the feature space. The tree predicts the same label for each bottommost (leaf) partition. Each partition is chosen greedily by selecting the best split from a set of possible splits, in order to maximize the information gain at a tree node.

Source code

    dt =DecisionTreeClassifier()
    model = dt.fit(training)

   // Predictions
   pred = model.transform(test)


   //Accuracy
   evaluator = MulticlassClassificationEvaluator()
   accuracy = evaluator.evaluate(pred)
   print("Accuracy", accuracy)


  //Report
   predAndLabels = pred.select("prediction", "label").rdd
   metrics = MulticlassMetrics(predAndLabels)
   print("Confusion Matrix", metrics.confusionMatrix())
   print("Precision", metrics.precision())
   print("Recall", metrics.recall())
   print("F-measure", metrics.fMeasure())

c. Random Forest

Random forests are a popular family of classification and regression methods. Random forests are ensembles of decision trees. Random forests combine many decision trees in order to reduce the risk of overfitting.

Source code

   // Splitting the data into training and data set
   training, test = data.select("label","features").randomSplit([0.70, 0.30])

  // Create Random Forest model and fit the model with training dataset
   rf = RandomForestClassifier()
   model = rf.fit(training)

  // Generate prediction from test dataset
  pred = model.transform(test)

  // Evaluate  the accuracy of the model
  evaluator = MulticlassClassificationEvaluator()
  accuracy = evaluator.evaluate(pred)

  // Show model accuracy
  print("Accuracy:", accuracy)

  // Report
  predictionAndLabels = pred.select("prediction", "label").rdd
  metrics = MulticlassMetrics(predictionAndLabels)
  print("Confusion Matrix:", metrics.confusionMatrix())
  print("Precision:", metrics.precision())
  print("Recall:", metrics.recall())
  print("F-measure:", metrics.fMeasure())

2. Clustering:

[Dataset](• https://archive.ics.uci.edu/ml/datasets/Diabetes+130-US+hospitals+for+years+1999-2008)

In order to identify diabetes, the following set of features are used to train the model.

  • admission_type_id
  • discharge_disposition_id
  • admission_source_id
  • time_in_hospital
  • num_lab_procedures

Source code

    spark = SparkSession.builder.appName("DecisionTree App").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    // Loading the data
    data = spark.read.format("csv").option("header", True) .option("inferSchema", True) .option("delimiter", ",") \
                           .load("D:\\UMKC\\__Spring2020\\CS5590BDP\\Module-2\\Lesson-7\\MachineLearning\\data\\diabetic_data.csv")

    data = data.select("admission_type_id", "discharge_disposition_id", "admission_source_id", "time_in_hospital", "num_lab_procedures")
    data.show()

    assembler = VectorAssembler(inputCols=data.columns, outputCol="features")
    data = assembler.transform(data)
    data.show()

    // Trains a k-means model.
    kmeans = KMeans().setK(2).setSeed(1)
    model = kmeans.fit(data)

   // Make predictions
   predictions = model.transform(data)
   // Shows the result.
   centers = model.clusterCenters()
   print("Cluster Centers: ")
   for center in centers:
       print(center)

3. Regression:

Dataset : https://archive.ics.uci.edu/ml/datasets/Automobile

In order to predict the wheel-base, the following features are used to train the model

  • length
  • width
  • height

Creating spark session and data loading

    spark = SparkSession.builder.appName("LinerReg App").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    // Loading the data
    data = spark.read.format("csv").option("header", True).option("inferSchema", True).option("delimiter", ",") \
                           .load("D:\\UMKC\\__Spring2020\\CS5590BDP\\Module-2\\Lesson-7\\MachineLearning\\data\\imports-85.data")


    data.printSchema()

a. Linear Regression

Source code

     data = data.withColumnRenamed("wheel-base","label").select("label", "length", "width", "height")
     data.show()

    assembler = VectorAssembler(inputCols=data.columns[1:], outputCol="features")
    data = assembler.transform(data)
    data.show()

    lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

    # Fit the model
    model = lr.fit(data)

    // Print the coefficients and intercept for linear regression
    print("Coefficients: %s" % str(model.coefficients))
    print("Intercept: %s" % str(model.intercept))

    // Summarize the model over the training set and print out some metrics
    trainingSummary = model.summary
    print("numIterations: %d" % trainingSummary.totalIterations)
    print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
    trainingSummary.residuals.show()
    print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
    print("r2: %f" % trainingSummary.r2)

b. Logistic Regression

Source code

    data = data.withColumn("label", F.when(F.col("num-of-doors") == "four", 1).otherwise(0)).select("label","length", "width","height")
    data.show()

   // Create vector assembler for feature columns
   assembler = VectorAssembler(inputCols=data.columns[1:], outputCol="features")
   data = assembler.transform(data)

   lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

  // Fit the model
  model = lr.fit(data)

  // Print the coefficients and intercept for logistic regression
  print("Coefficients: " + str(model.coefficients))
  print("Intercept: " + str(model.intercept))

  // We can also use the multinomial family for binary classification
  mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

 // Fit the model
 mlr_model = mlr.fit(data)

 // Print the coefficients and intercepts for logistic regression with multinomial family
 print("Multinomial coefficients: " + str(mlr_model.coefficientMatrix))
 print("Multinomial intercepts: " + str(mlr_model.interceptVector))

Bonus

The following steps are executed in each question in the previous part. Please feel free to check out each question to see the following tasks.

  1. Show confusion matrix for any machine learning algorithm
  2. Calculate Precision. Recall, and F1-score
  3. Inference on custom data for any algorithm of your own.

References:

https://spark.apache.org/docs/latest/ml-guide.html

https://www.edureka.co/blog/spark-mllib/