Skip to main content

MLlib for Machine Learning

Module Duration: 4-5 hours Focus: Distributed machine learning with MLlib Prerequisites: Spark SQL, DataFrames, and basic ML concepts

Overview

MLlib is Apache Spark’s scalable machine learning library built on top of Spark Core. It provides high-quality algorithms for classification, regression, clustering, collaborative filtering, and more, all designed to scale horizontally across clusters.

Key Features

Scalability: Train models on datasets too large for single machines. Speed: Leverage Spark’s in-memory computing for fast iterations. Ease of Use: High-level APIs in Scala, Java, Python, and R. Integration: Seamlessly integrate with Spark SQL, Streaming, and GraphX.

ML Pipelines

Pipeline Concepts

A pipeline chains multiple transformers and estimators to specify an ML workflow. Transformer: Algorithm that transforms a DataFrame (e.g., feature extraction, model). Estimator: Algorithm that fits on a DataFrame to produce a Transformer (e.g., learning algorithm). Pipeline: Chains multiple stages of transformers and estimators. Parameter: Common API for specifying parameters.

Basic Pipeline Example

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MLPipeline").getOrCreate()

# Load data
data = spark.read.format("libsvm").load("/data/sample_libsvm_data.txt")

# Split data
train, test = data.randomSplit([0.8, 0.2], seed=42)

# Define pipeline stages
assembler = VectorAssembler(
    inputCols=["feature1", "feature2", "feature3"],
    outputCol="features"
)

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    withStd=True,
    withMean=True
)

lr = LogisticRegression(
    featuresCol="scaledFeatures",
    labelCol="label",
    maxIter=10
)

# Create pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

# Train model
model = pipeline.fit(train)

# Make predictions
predictions = model.transform(test)
predictions.select("prediction", "label", "probability").show()

Scala Pipeline Example

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("MLPipeline")
  .getOrCreate()

// Load data
val data = spark.read.format("libsvm")
  .load("/data/sample_libsvm_data.txt")

// Split data
val Array(train, test) = data.randomSplit(Array(0.8, 0.2), seed = 42)

// Define pipeline stages
val assembler = new VectorAssembler()
  .setInputCols(Array("feature1", "feature2", "feature3"))
  .setOutputCol("features")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(true)

val lr = new LogisticRegression()
  .setFeaturesCol("scaledFeatures")
  .setLabelCol("label")
  .setMaxIter(10)

// Create and train pipeline
val pipeline = new Pipeline()
  .setStages(Array(assembler, scaler, lr))

val model = pipeline.fit(train)

// Make predictions
val predictions = model.transform(test)
predictions.select("prediction", "label", "probability").show()

Feature Engineering

VectorAssembler

Combine multiple columns into a feature vector.
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["age", "income", "credit_score"],
    outputCol="features"
)

df_with_features = assembler.transform(df)

StringIndexer

Convert string labels to numeric indices.
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(
    inputCol="category",
    outputCol="categoryIndex"
)

indexed = indexer.fit(df).transform(df)

OneHotEncoder

Convert categorical indices to binary vectors.
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(
    inputCols=["categoryIndex"],
    outputCols=["categoryVec"]
)

encoded = encoder.fit(indexed).transform(indexed)

StandardScaler

Standardize features by removing mean and scaling to unit variance.
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    withStd=True,
    withMean=True
)

scaled = scaler.fit(df).transform(df)

MinMaxScaler

Scale features to a specific range (default [0, 1]).
from pyspark.ml.feature import MinMaxScaler

mmScaler = MinMaxScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    min=0.0,
    max=1.0
)

scaled = mmScaler.fit(df).transform(df)

Tokenizer and HashingTF

Process text data for ML.
from pyspark.ml.feature import Tokenizer, HashingTF, IDF

# Tokenize text
tokenizer = Tokenizer(inputCol="text", outputCol="words")
words = tokenizer.transform(df)

# Create term frequency features
hashingTF = HashingTF(
    inputCol="words",
    outputCol="rawFeatures",
    numFeatures=1000
)
featurized = hashingTF.transform(words)

# Apply IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurized)
rescaled = idfModel.transform(featurized)

PCA (Principal Component Analysis)

Dimensionality reduction.
from pyspark.ml.feature import PCA

pca = PCA(
    k=3,  # Number of principal components
    inputCol="features",
    outputCol="pcaFeatures"
)

pcaModel = pca.fit(df)
result = pcaModel.transform(df)

Scala Feature Engineering

import org.apache.spark.ml.feature._

// VectorAssembler
val assembler = new VectorAssembler()
  .setInputCols(Array("age", "income", "credit_score"))
  .setOutputCol("features")

// StringIndexer
val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")

// OneHotEncoder
val encoder = new OneHotEncoder()
  .setInputCols(Array("categoryIndex"))
  .setOutputCols(Array("categoryVec"))

// StandardScaler
val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(true)

// PCA
val pca = new PCA()
  .setK(3)
  .setInputCol("features")
  .setOutputCol("pcaFeatures")

Classification

Logistic Regression

Binary and multiclass classification.
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create classifier
lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=10,
    regParam=0.01,
    elasticNetParam=0.8
)

# Train model
lrModel = lr.fit(train)

# Model coefficients and intercept
print(f"Coefficients: {lrModel.coefficients}")
print(f"Intercept: {lrModel.intercept}")

# Make predictions
predictions = lrModel.transform(test)

# Evaluate
evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")

Decision Trees

from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(
    featuresCol="features",
    labelCol="label",
    maxDepth=5,
    maxBins=32,
    impurity="gini"
)

dtModel = dt.fit(train)

# Feature importance
print(f"Feature Importance: {dtModel.featureImportances}")

# Tree structure
print(f"Tree:\n{dtModel.toDebugString}")

Random Forest

from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=100,
    maxDepth=10,
    maxBins=32,
    seed=42
)

rfModel = rf.fit(train)

# Get predictions
predictions = rfModel.transform(test)

# Feature importance
print(f"Feature Importance: {rfModel.featureImportances}")

Gradient-Boosted Trees

from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(
    featuresCol="features",
    labelCol="label",
    maxIter=10,
    maxDepth=5,
    stepSize=0.1
)

gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)

Naive Bayes

from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(
    featuresCol="features",
    labelCol="label",
    smoothing=1.0,
    modelType="multinomial"
)

nbModel = nb.fit(train)
predictions = nbModel.transform(test)

Multilayer Perceptron (Neural Network)

from pyspark.ml.classification import MultilayerPerceptronClassifier

# Specify layers: input, hidden layers, output
layers = [4, 5, 4, 3]  # 4 features, 2 hidden layers, 3 classes

mlp = MultilayerPerceptronClassifier(
    featuresCol="features",
    labelCol="label",
    layers=layers,
    blockSize=128,
    seed=42,
    maxIter=100
)

mlpModel = mlp.fit(train)
predictions = mlpModel.transform(test)

Scala Classification

import org.apache.spark.ml.classification._

// Logistic Regression
val lr = new LogisticRegression()
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setMaxIter(10)
  .setRegParam(0.01)

// Random Forest
val rf = new RandomForestClassifier()
  .setNumTrees(100)
  .setMaxDepth(10)
  .setFeaturesCol("features")
  .setLabelCol("label")

// GBT
val gbt = new GBTClassifier()
  .setMaxIter(10)
  .setMaxDepth(5)
  .setFeaturesCol("features")
  .setLabelCol("label")

Regression

Linear Regression

from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Create regressor
lr = LinearRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=10,
    regParam=0.3,
    elasticNetParam=0.8
)

# Train model
lrModel = lr.fit(train)

# Print coefficients and intercept
print(f"Coefficients: {lrModel.coefficients}")
print(f"Intercept: {lrModel.intercept}")

# Training summary
trainingSummary = lrModel.summary
print(f"RMSE: {trainingSummary.rootMeanSquaredError}")
print(f"R2: {trainingSummary.r2}")
print(f"MAE: {trainingSummary.meanAbsoluteError}")

# Make predictions
predictions = lrModel.transform(test)

# Evaluate
evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = evaluator.evaluate(predictions)
print(f"Test RMSE: {rmse}")

Decision Tree Regression

from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(
    featuresCol="features",
    labelCol="label",
    maxDepth=5,
    maxBins=32
)

dtModel = dt.fit(train)
predictions = dtModel.transform(test)

Random Forest Regression

from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="label",
    numTrees=100,
    maxDepth=10,
    seed=42
)

rfModel = rf.fit(train)
predictions = rfModel.transform(test)

# Feature importance
print(f"Feature Importance: {rfModel.featureImportances}")

Gradient-Boosted Trees Regression

from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(
    featuresCol="features",
    labelCol="label",
    maxIter=10,
    maxDepth=5,
    stepSize=0.1
)

gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)

Generalized Linear Regression

from pyspark.ml.regression import GeneralizedLinearRegression

glr = GeneralizedLinearRegression(
    featuresCol="features",
    labelCol="label",
    family="gaussian",
    link="identity",
    maxIter=10,
    regParam=0.3
)

glrModel = glr.fit(train)
predictions = glrModel.transform(test)

# Summary
summary = glrModel.summary
print(f"Coefficient Standard Errors: {summary.coefficientStandardErrors}")
print(f"T Values: {summary.tValues}")
print(f"P Values: {summary.pValues}")

Clustering

K-Means

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Create clusterer
kmeans = KMeans(
    featuresCol="features",
    k=3,
    seed=42,
    maxIter=20
)

# Train model
kmeansModel = kmeans.fit(df)

# Get cluster centers
centers = kmeansModel.clusterCenters()
print("Cluster Centers:")
for center in centers:
    print(center)

# Make predictions
predictions = kmeansModel.transform(df)

# Evaluate clustering
evaluator = ClusteringEvaluator(
    featuresCol="features",
    predictionCol="prediction",
    metricName="silhouette"
)

silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette}")

# Within Set Sum of Squared Errors
wssse = kmeansModel.summary.trainingCost
print(f"WSSSE: {wssse}")

Bisecting K-Means

from pyspark.ml.clustering import BisectingKMeans

bkm = BisectingKMeans(
    featuresCol="features",
    k=5,
    maxIter=20,
    seed=42
)

bkmModel = bkm.fit(df)
predictions = bkmModel.transform(df)

# Cluster centers
centers = bkmModel.clusterCenters()

Gaussian Mixture Model

from pyspark.ml.clustering import GaussianMixture

gmm = GaussianMixture(
    featuresCol="features",
    k=3,
    maxIter=20,
    seed=42
)

gmmModel = gmm.fit(df)

# Get predictions with probabilities
predictions = gmmModel.transform(df)
predictions.select("features", "prediction", "probability").show()

# Gaussians
print(f"Gaussians:\n{gmmModel.gaussiansDF.show()}")

LDA (Latent Dirichlet Allocation)

Topic modeling for text.
from pyspark.ml.clustering import LDA

lda = LDA(
    featuresCol="features",
    k=10,  # Number of topics
    maxIter=10,
    seed=42
)

ldaModel = lda.fit(corpus)

# Topics
topics = ldaModel.describeTopics(maxTermsPerTopic=10)
topics.show(truncate=False)

# Transform documents
transformed = ldaModel.transform(corpus)

Collaborative Filtering

ALS (Alternating Least Squares)

Recommendation system using matrix factorization.
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Load ratings data
ratings = spark.read.csv("/data/ratings.csv", header=True, inferSchema=True)

# Split data
train, test = ratings.randomSplit([0.8, 0.2], seed=42)

# Create ALS model
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,
    implicitPrefs=False,
    rank=10,
    maxIter=10,
    regParam=0.1,
    alpha=1.0
)

# Train model
alsModel = als.fit(train)

# Make predictions
predictions = alsModel.transform(test)

# Evaluate
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

# Generate recommendations
# Top 10 movie recommendations for each user
userRecs = alsModel.recommendForAllUsers(10)
userRecs.show(truncate=False)

# Top 10 user recommendations for each movie
movieRecs = alsModel.recommendForAllItems(10)
movieRecs.show(truncate=False)

# Recommendations for specific users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = alsModel.recommendForUserSubset(users, 10)
userSubsetRecs.show(truncate=False)

Scala ALS

import org.apache.spark.ml.recommendation.ALS

val ratings = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("/data/ratings.csv")

val Array(train, test) = ratings.randomSplit(Array(0.8, 0.2), seed = 42)

val als = new ALS()
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")
  .setMaxIter(10)
  .setRegParam(0.1)
  .setRank(10)
  .setColdStartStrategy("drop")

val model = als.fit(train)

// Generate recommendations
val userRecs = model.recommendForAllUsers(10)
val movieRecs = model.recommendForAllItems(10)

Model Evaluation

Classification Metrics

from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator,
    MulticlassClassificationEvaluator
)

# Binary classification
binary_evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = binary_evaluator.evaluate(predictions)
print(f"AUC: {auc}")

# Change metric
binary_evaluator.setMetricName("areaUnderPR")
aupr = binary_evaluator.evaluate(predictions)
print(f"AUPR: {aupr}")

# Multiclass classification
multi_evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction"
)

# Accuracy
multi_evaluator.setMetricName("accuracy")
accuracy = multi_evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# F1 Score
multi_evaluator.setMetricName("f1")
f1 = multi_evaluator.evaluate(predictions)
print(f"F1: {f1}")

# Weighted Precision
multi_evaluator.setMetricName("weightedPrecision")
precision = multi_evaluator.evaluate(predictions)
print(f"Precision: {precision}")

# Weighted Recall
multi_evaluator.setMetricName("weightedRecall")
recall = multi_evaluator.evaluate(predictions)
print(f"Recall: {recall}")

Regression Metrics

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction"
)

# RMSE
evaluator.setMetricName("rmse")
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

# MSE
evaluator.setMetricName("mse")
mse = evaluator.evaluate(predictions)
print(f"MSE: {mse}")

# MAE
evaluator.setMetricName("mae")
mae = evaluator.evaluate(predictions)
print(f"MAE: {mae}")

# R2
evaluator.setMetricName("r2")
r2 = evaluator.evaluate(predictions)
print(f"R2: {r2}")

Confusion Matrix

from pyspark.mllib.evaluation import MulticlassMetrics

# Convert to RDD for mllib metrics
predictionAndLabels = predictions.select("prediction", "label").rdd

metrics = MulticlassMetrics(predictionAndLabels)

# Confusion matrix
print(f"Confusion Matrix:\n{metrics.confusionMatrix()}")

# Overall statistics
print(f"Accuracy: {metrics.accuracy}")
print(f"Weighted Precision: {metrics.weightedPrecision}")
print(f"Weighted Recall: {metrics.weightedRecall}")
print(f"Weighted F1: {metrics.weightedFMeasure()}")

# Per-class metrics
labels = predictions.select("label").distinct().rdd.map(lambda x: x[0]).collect()
for label in sorted(labels):
    print(f"\nClass {label}:")
    print(f"  Precision: {metrics.precision(label)}")
    print(f"  Recall: {metrics.recall(label)}")
    print(f"  F1: {metrics.fMeasure(label)}")

Hyperparameter Tuning

Cross-Validation

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create model
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Build parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.maxIter, [10, 20, 50]) \
    .build()

# Create evaluator
evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    metricName="areaUnderROC"
)

# Create cross-validator
cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5,
    seed=42
)

# Train with cross-validation
cvModel = cv.fit(train)

# Best model
bestModel = cvModel.bestModel
print(f"Best regParam: {bestModel._java_obj.getRegParam()}")
print(f"Best elasticNetParam: {bestModel._java_obj.getElasticNetParam()}")
print(f"Best maxIter: {bestModel._java_obj.getMaxIter()}")

# Average metrics for each parameter combination
avgMetrics = cvModel.avgMetrics
for params, metric in zip(paramGrid, avgMetrics):
    print(f"{params} -> {metric}")

# Make predictions with best model
predictions = cvModel.transform(test)

Train-Validation Split

Faster alternative to cross-validation.
from pyspark.ml.tuning import TrainValidationSplit

# Build parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Create train-validation split
tvs = TrainValidationSplit(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    trainRatio=0.8,
    seed=42
)

# Train
tvsModel = tvs.fit(train)

# Best model
bestModel = tvsModel.bestModel

# Make predictions
predictions = tvsModel.transform(test)

Scala Hyperparameter Tuning

import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.classification.LogisticRegression

val lr = new LogisticRegression()

val paramGrid = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(0.01, 0.1, 0.5))
  .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
  .addGrid(lr.maxIter, Array(10, 20, 50))
  .build()

val evaluator = new BinaryClassificationEvaluator()
  .setLabelCol("label")
  .setMetricName("areaUnderROC")

val cv = new CrossValidator()
  .setEstimator(lr)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(5)
  .setSeed(42)

val cvModel = cv.fit(train)
val predictions = cvModel.transform(test)

Model Persistence

Save and Load Models

# Save pipeline model
model.write().overwrite().save("/models/lr_pipeline")

# Load pipeline model
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("/models/lr_pipeline")

# Save individual model
lrModel.write().overwrite().save("/models/logistic_regression")

# Load individual model
from pyspark.ml.classification import LogisticRegressionModel
loaded_lr = LogisticRegressionModel.load("/models/logistic_regression")

# Make predictions with loaded model
predictions = loaded_model.transform(test)

Scala Model Persistence

// Save model
model.write.overwrite().save("/models/lr_pipeline")

// Load model
import org.apache.spark.ml.PipelineModel
val loadedModel = PipelineModel.load("/models/lr_pipeline")

// Use loaded model
val predictions = loadedModel.transform(test)

Real-World Use Cases

Credit Risk Prediction

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Load credit data
credit = spark.read.csv("/data/credit.csv", header=True, inferSchema=True)

# Feature engineering
categorical_cols = ["employment_status", "loan_purpose", "home_ownership"]
numeric_cols = ["age", "income", "loan_amount", "credit_score", "debt_to_income"]

# Index categorical variables
indexers = [
    StringIndexer(inputCol=col, outputCol=col+"_index")
    for col in categorical_cols
]

# Assemble features
assembler = VectorAssembler(
    inputCols=[col+"_index" for col in categorical_cols] + numeric_cols,
    outputCol="features"
)

# Scale features
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures"
)

# Index label
labelIndexer = StringIndexer(inputCol="default", outputCol="label")

# Random Forest classifier
rf = RandomForestClassifier(
    featuresCol="scaledFeatures",
    labelCol="label",
    numTrees=100,
    maxDepth=10,
    seed=42
)

# Create pipeline
pipeline = Pipeline(stages=indexers + [assembler, scaler, labelIndexer, rf])

# Split data
train, test = credit.randomSplit([0.8, 0.2], seed=42)

# Train model
model = pipeline.fit(train)

# Evaluate
predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")

# Feature importance
rfModel = model.stages[-1]
feature_names = [col+"_index" for col in categorical_cols] + numeric_cols
importances = list(zip(feature_names, rfModel.featureImportances.toArray()))
sorted_importances = sorted(importances, key=lambda x: x[1], reverse=True)

print("\nTop 10 Important Features:")
for feature, importance in sorted_importances[:10]:
    print(f"{feature}: {importance:.4f}")

Customer Segmentation

from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Load customer data
customers = spark.read.csv("/data/customers.csv", header=True, inferSchema=True)

# Select features
feature_cols = [
    "recency", "frequency", "monetary_value",
    "avg_order_value", "customer_lifetime_value"
]

# Assemble and scale
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Prepare data
features_df = assembler.transform(customers)
scaled_df = scaler.fit(features_df).transform(features_df)

# Find optimal k using elbow method
wssses = []
for k in range(2, 11):
    kmeans = KMeans(featuresCol="scaledFeatures", k=k, seed=42)
    model = kmeans.fit(scaled_df)
    wssse = model.summary.trainingCost
    wssses.append((k, wssse))
    print(f"k={k}, WSSSE={wssse}")

# Train final model with chosen k
kmeans = KMeans(featuresCol="scaledFeatures", k=5, seed=42)
kmeansModel = kmeans.fit(scaled_df)

# Get cluster assignments
clustered = kmeansModel.transform(scaled_df)

# Analyze clusters
cluster_stats = clustered.groupBy("prediction").agg(
    *[avg(col).alias(f"avg_{col}") for col in feature_cols],
    count("*").alias("count")
)
cluster_stats.show()

Movie Recommendation System

from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, explode

# Load ratings
ratings = spark.read.csv("/data/ratings.csv", header=True, inferSchema=True)
movies = spark.read.csv("/data/movies.csv", header=True, inferSchema=True)

# Split data
train, test = ratings.randomSplit([0.8, 0.2], seed=42)

# Build ALS model
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    rank=10,
    maxIter=10,
    regParam=0.1
)

# Train
model = als.fit(train)

# Evaluate
predictions = model.transform(test)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

# Generate top 10 recommendations for each user
userRecs = model.recommendForAllUsers(10)

# Show recommendations with movie titles
user_recs_exploded = userRecs.select(
    "userId",
    explode("recommendations").alias("rec")
).select(
    "userId",
    col("rec.movieId").alias("movieId"),
    col("rec.rating").alias("predicted_rating")
).join(movies, "movieId")

user_recs_exploded.filter(col("userId") == 1).show(10, truncate=False)

Churn Prediction

from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer

# Load customer data
customers = spark.read.csv("/data/customer_churn.csv", header=True, inferSchema=True)

# Feature columns
feature_cols = [
    "tenure", "monthly_charges", "total_charges",
    "num_services", "customer_service_calls",
    "contract_type_index", "payment_method_index"
]

# Index categorical features
contract_indexer = StringIndexer(
    inputCol="contract_type",
    outputCol="contract_type_index"
)
payment_indexer = StringIndexer(
    inputCol="payment_method",
    outputCol="payment_method_index"
)

# Assemble features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Index label
label_indexer = StringIndexer(inputCol="churn", outputCol="label")

# GBT Classifier
gbt = GBTClassifier(
    featuresCol="features",
    labelCol="label",
    maxIter=20,
    maxDepth=5
)

# Pipeline
pipeline = Pipeline(stages=[
    contract_indexer,
    payment_indexer,
    assembler,
    label_indexer,
    gbt
])

# Train
train, test = customers.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train)

# Predict
predictions = model.transform(test)

# Evaluate
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")

# Show high-risk customers
high_risk = predictions.filter(col("prediction") == 1.0) \
    .select("customer_id", "probability", "prediction") \
    .orderBy(col("probability").desc())
high_risk.show(20)

Performance Tips

Data Caching

# Cache training data for iterative algorithms
train.cache()
model = rf.fit(train)
train.unpersist()

Partition Tuning

# Repartition for better parallelism
df = df.repartition(200)

# Coalesce to reduce partitions after filtering
filtered = df.filter(col("active") == True).coalesce(50)

Broadcast Variables

from pyspark.sql.functions import broadcast

# Broadcast small dimension tables in joins
enriched = transactions.join(broadcast(products), "product_id")

Feature Selection

# Reduce feature dimensions with PCA
from pyspark.ml.feature import PCA

pca = PCA(k=50, inputCol="features", outputCol="pcaFeatures")
reduced = pca.fit(df).transform(df)

Sampling for Development

# Use sampling for faster iteration during development
sample = df.sample(fraction=0.1, seed=42)
model = pipeline.fit(sample)

Common Pitfalls

Issue 1: Imbalanced Classes

Problem: Model biased toward majority class. Solution:
# Calculate class weights
total = train.count()
positive = train.filter(col("label") == 1).count()
negative = train.filter(col("label") == 0).count()

weight_positive = total / (2.0 * positive)
weight_negative = total / (2.0 * negative)

# Add weight column
from pyspark.sql.functions import when
weighted = train.withColumn(
    "weight",
    when(col("label") == 1, weight_positive).otherwise(weight_negative)
)

# Use weighted training
lr = LogisticRegression(weightCol="weight")

Issue 2: Data Leakage

Problem: Using future information in features. Solution:
# Split data by time, not randomly
train = df.filter(col("date") < "2023-01-01")
test = df.filter(col("date") >= "2023-01-01")

# Fit transformers only on training data
scaler = StandardScaler(inputCol="features", outputCol="scaled")
scaler_model = scaler.fit(train)  # Fit only on train
train_scaled = scaler_model.transform(train)
test_scaled = scaler_model.transform(test)

Issue 3: Missing Values

Problem: Algorithms fail on null values. Solution:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=["age", "income"],
    outputCols=["age_imputed", "income_imputed"],
    strategy="mean"  # or "median", "mode"
)

imputed = imputer.fit(df).transform(df)

Issue 4: Categorical Variables

Problem: Not encoding categorical features. Solution:
# Always index and encode categoricals
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
encoder = OneHotEncoder(inputCols=["categoryIndex"], outputCols=["categoryVec"])

# Or use VectorIndexer for automatic handling
from pyspark.ml.feature import VectorIndexer

indexer = VectorIndexer(
    inputCol="features",
    outputCol="indexed",
    maxCategories=10  # Features with <= 10 distinct values are categorical
)

Hands-On Exercises

Exercise 1: Build a Classification Pipeline

Create a complete pipeline for binary classification.
# TODO: Implement classification pipeline
# 1. Load and explore data
# 2. Handle missing values
# 3. Encode categorical variables
# 4. Scale numerical features
# 5. Train multiple models
# 6. Compare performance
# 7. Tune hyperparameters

# Your code here

Exercise 2: Customer Segmentation

Perform clustering analysis on customer data.
# TODO: Customer segmentation
# 1. Load customer transaction data
# 2. Engineer RFM features
# 3. Find optimal number of clusters
# 4. Train K-means model
# 5. Analyze cluster characteristics
# 6. Visualize results

# Your code here

Exercise 3: Recommendation System

Build a movie recommendation system.
# TODO: Build recommender
# 1. Load ratings data
# 2. Train ALS model
# 3. Tune hyperparameters
# 4. Generate recommendations
# 5. Evaluate with RMSE
# 6. Handle cold-start problem

# Your code here

Summary

MLlib provides a comprehensive suite of distributed machine learning algorithms:
  • Pipelines: Streamline ML workflows with transformers and estimators
  • Feature Engineering: Rich set of transformers for data preparation
  • Algorithms: Classification, regression, clustering, and recommendations
  • Evaluation: Comprehensive metrics for model assessment
  • Tuning: Cross-validation and parameter grid search
  • Scalability: Handle datasets too large for single machines

Key Takeaways

  1. Always use pipelines for reproducible ML workflows
  2. Properly handle categorical variables with indexing and encoding
  3. Scale features for better algorithm performance
  4. Use cross-validation for robust hyperparameter tuning
  5. Cache data for iterative algorithms
  6. Monitor and address class imbalance
  7. Prevent data leakage with proper train/test splits

Next Steps

  • Practice building end-to-end ML pipelines
  • Experiment with hyperparameter tuning
  • Explore feature engineering techniques
  • Learn production model deployment
  • Study advanced ensemble methods

Continue to the next module to master performance tuning and optimization techniques.