MLlib for Machine Learning
Module Duration: 4-5 hours
Focus: Distributed machine learning with MLlib
Prerequisites: Spark SQL, DataFrames, and basic ML concepts
Overview
MLlib is Apache Spark’s scalable machine learning library built on top of Spark Core. It provides high-quality algorithms for classification, regression, clustering, collaborative filtering, and more, all designed to scale horizontally across clusters.Key Features
Scalability: Train models on datasets too large for single machines. Speed: Leverage Spark’s in-memory computing for fast iterations. Ease of Use: High-level APIs in Scala, Java, Python, and R. Integration: Seamlessly integrate with Spark SQL, Streaming, and GraphX.ML Pipelines
Pipeline Concepts
A pipeline chains multiple transformers and estimators to specify an ML workflow. Transformer: Algorithm that transforms a DataFrame (e.g., feature extraction, model). Estimator: Algorithm that fits on a DataFrame to produce a Transformer (e.g., learning algorithm). Pipeline: Chains multiple stages of transformers and estimators. Parameter: Common API for specifying parameters.Basic Pipeline Example
Copy
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MLPipeline").getOrCreate()
# Load data
data = spark.read.format("libsvm").load("/data/sample_libsvm_data.txt")
# Split data
train, test = data.randomSplit([0.8, 0.2], seed=42)
# Define pipeline stages
assembler = VectorAssembler(
inputCols=["feature1", "feature2", "feature3"],
outputCol="features"
)
scaler = StandardScaler(
inputCol="features",
outputCol="scaledFeatures",
withStd=True,
withMean=True
)
lr = LogisticRegression(
featuresCol="scaledFeatures",
labelCol="label",
maxIter=10
)
# Create pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])
# Train model
model = pipeline.fit(train)
# Make predictions
predictions = model.transform(test)
predictions.select("prediction", "label", "probability").show()
Scala Pipeline Example
Copy
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("MLPipeline")
.getOrCreate()
// Load data
val data = spark.read.format("libsvm")
.load("/data/sample_libsvm_data.txt")
// Split data
val Array(train, test) = data.randomSplit(Array(0.8, 0.2), seed = 42)
// Define pipeline stages
val assembler = new VectorAssembler()
.setInputCols(Array("feature1", "feature2", "feature3"))
.setOutputCol("features")
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(true)
val lr = new LogisticRegression()
.setFeaturesCol("scaledFeatures")
.setLabelCol("label")
.setMaxIter(10)
// Create and train pipeline
val pipeline = new Pipeline()
.setStages(Array(assembler, scaler, lr))
val model = pipeline.fit(train)
// Make predictions
val predictions = model.transform(test)
predictions.select("prediction", "label", "probability").show()
Feature Engineering
VectorAssembler
Combine multiple columns into a feature vector.Copy
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=["age", "income", "credit_score"],
outputCol="features"
)
df_with_features = assembler.transform(df)
StringIndexer
Convert string labels to numeric indices.Copy
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(
inputCol="category",
outputCol="categoryIndex"
)
indexed = indexer.fit(df).transform(df)
OneHotEncoder
Convert categorical indices to binary vectors.Copy
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(
inputCols=["categoryIndex"],
outputCols=["categoryVec"]
)
encoded = encoder.fit(indexed).transform(indexed)
StandardScaler
Standardize features by removing mean and scaling to unit variance.Copy
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(
inputCol="features",
outputCol="scaledFeatures",
withStd=True,
withMean=True
)
scaled = scaler.fit(df).transform(df)
MinMaxScaler
Scale features to a specific range (default [0, 1]).Copy
from pyspark.ml.feature import MinMaxScaler
mmScaler = MinMaxScaler(
inputCol="features",
outputCol="scaledFeatures",
min=0.0,
max=1.0
)
scaled = mmScaler.fit(df).transform(df)
Tokenizer and HashingTF
Process text data for ML.Copy
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
# Tokenize text
tokenizer = Tokenizer(inputCol="text", outputCol="words")
words = tokenizer.transform(df)
# Create term frequency features
hashingTF = HashingTF(
inputCol="words",
outputCol="rawFeatures",
numFeatures=1000
)
featurized = hashingTF.transform(words)
# Apply IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurized)
rescaled = idfModel.transform(featurized)
PCA (Principal Component Analysis)
Dimensionality reduction.Copy
from pyspark.ml.feature import PCA
pca = PCA(
k=3, # Number of principal components
inputCol="features",
outputCol="pcaFeatures"
)
pcaModel = pca.fit(df)
result = pcaModel.transform(df)
Scala Feature Engineering
Copy
import org.apache.spark.ml.feature._
// VectorAssembler
val assembler = new VectorAssembler()
.setInputCols(Array("age", "income", "credit_score"))
.setOutputCol("features")
// StringIndexer
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
// OneHotEncoder
val encoder = new OneHotEncoder()
.setInputCols(Array("categoryIndex"))
.setOutputCols(Array("categoryVec"))
// StandardScaler
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(true)
// PCA
val pca = new PCA()
.setK(3)
.setInputCol("features")
.setOutputCol("pcaFeatures")
Classification
Logistic Regression
Binary and multiclass classification.Copy
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Create classifier
lr = LogisticRegression(
featuresCol="features",
labelCol="label",
maxIter=10,
regParam=0.01,
elasticNetParam=0.8
)
# Train model
lrModel = lr.fit(train)
# Model coefficients and intercept
print(f"Coefficients: {lrModel.coefficients}")
print(f"Intercept: {lrModel.intercept}")
# Make predictions
predictions = lrModel.transform(test)
# Evaluate
evaluator = BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")
Decision Trees
Copy
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(
featuresCol="features",
labelCol="label",
maxDepth=5,
maxBins=32,
impurity="gini"
)
dtModel = dt.fit(train)
# Feature importance
print(f"Feature Importance: {dtModel.featureImportances}")
# Tree structure
print(f"Tree:\n{dtModel.toDebugString}")
Random Forest
Copy
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(
featuresCol="features",
labelCol="label",
numTrees=100,
maxDepth=10,
maxBins=32,
seed=42
)
rfModel = rf.fit(train)
# Get predictions
predictions = rfModel.transform(test)
# Feature importance
print(f"Feature Importance: {rfModel.featureImportances}")
Gradient-Boosted Trees
Copy
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(
featuresCol="features",
labelCol="label",
maxIter=10,
maxDepth=5,
stepSize=0.1
)
gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)
Naive Bayes
Copy
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(
featuresCol="features",
labelCol="label",
smoothing=1.0,
modelType="multinomial"
)
nbModel = nb.fit(train)
predictions = nbModel.transform(test)
Multilayer Perceptron (Neural Network)
Copy
from pyspark.ml.classification import MultilayerPerceptronClassifier
# Specify layers: input, hidden layers, output
layers = [4, 5, 4, 3] # 4 features, 2 hidden layers, 3 classes
mlp = MultilayerPerceptronClassifier(
featuresCol="features",
labelCol="label",
layers=layers,
blockSize=128,
seed=42,
maxIter=100
)
mlpModel = mlp.fit(train)
predictions = mlpModel.transform(test)
Scala Classification
Copy
import org.apache.spark.ml.classification._
// Logistic Regression
val lr = new LogisticRegression()
.setFeaturesCol("features")
.setLabelCol("label")
.setMaxIter(10)
.setRegParam(0.01)
// Random Forest
val rf = new RandomForestClassifier()
.setNumTrees(100)
.setMaxDepth(10)
.setFeaturesCol("features")
.setLabelCol("label")
// GBT
val gbt = new GBTClassifier()
.setMaxIter(10)
.setMaxDepth(5)
.setFeaturesCol("features")
.setLabelCol("label")
Regression
Linear Regression
Copy
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
# Create regressor
lr = LinearRegression(
featuresCol="features",
labelCol="label",
maxIter=10,
regParam=0.3,
elasticNetParam=0.8
)
# Train model
lrModel = lr.fit(train)
# Print coefficients and intercept
print(f"Coefficients: {lrModel.coefficients}")
print(f"Intercept: {lrModel.intercept}")
# Training summary
trainingSummary = lrModel.summary
print(f"RMSE: {trainingSummary.rootMeanSquaredError}")
print(f"R2: {trainingSummary.r2}")
print(f"MAE: {trainingSummary.meanAbsoluteError}")
# Make predictions
predictions = lrModel.transform(test)
# Evaluate
evaluator = RegressionEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
print(f"Test RMSE: {rmse}")
Decision Tree Regression
Copy
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(
featuresCol="features",
labelCol="label",
maxDepth=5,
maxBins=32
)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
Random Forest Regression
Copy
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(
featuresCol="features",
labelCol="label",
numTrees=100,
maxDepth=10,
seed=42
)
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
# Feature importance
print(f"Feature Importance: {rfModel.featureImportances}")
Gradient-Boosted Trees Regression
Copy
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(
featuresCol="features",
labelCol="label",
maxIter=10,
maxDepth=5,
stepSize=0.1
)
gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)
Generalized Linear Regression
Copy
from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression(
featuresCol="features",
labelCol="label",
family="gaussian",
link="identity",
maxIter=10,
regParam=0.3
)
glrModel = glr.fit(train)
predictions = glrModel.transform(test)
# Summary
summary = glrModel.summary
print(f"Coefficient Standard Errors: {summary.coefficientStandardErrors}")
print(f"T Values: {summary.tValues}")
print(f"P Values: {summary.pValues}")
Clustering
K-Means
Copy
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# Create clusterer
kmeans = KMeans(
featuresCol="features",
k=3,
seed=42,
maxIter=20
)
# Train model
kmeansModel = kmeans.fit(df)
# Get cluster centers
centers = kmeansModel.clusterCenters()
print("Cluster Centers:")
for center in centers:
print(center)
# Make predictions
predictions = kmeansModel.transform(df)
# Evaluate clustering
evaluator = ClusteringEvaluator(
featuresCol="features",
predictionCol="prediction",
metricName="silhouette"
)
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette}")
# Within Set Sum of Squared Errors
wssse = kmeansModel.summary.trainingCost
print(f"WSSSE: {wssse}")
Bisecting K-Means
Copy
from pyspark.ml.clustering import BisectingKMeans
bkm = BisectingKMeans(
featuresCol="features",
k=5,
maxIter=20,
seed=42
)
bkmModel = bkm.fit(df)
predictions = bkmModel.transform(df)
# Cluster centers
centers = bkmModel.clusterCenters()
Gaussian Mixture Model
Copy
from pyspark.ml.clustering import GaussianMixture
gmm = GaussianMixture(
featuresCol="features",
k=3,
maxIter=20,
seed=42
)
gmmModel = gmm.fit(df)
# Get predictions with probabilities
predictions = gmmModel.transform(df)
predictions.select("features", "prediction", "probability").show()
# Gaussians
print(f"Gaussians:\n{gmmModel.gaussiansDF.show()}")
LDA (Latent Dirichlet Allocation)
Topic modeling for text.Copy
from pyspark.ml.clustering import LDA
lda = LDA(
featuresCol="features",
k=10, # Number of topics
maxIter=10,
seed=42
)
ldaModel = lda.fit(corpus)
# Topics
topics = ldaModel.describeTopics(maxTermsPerTopic=10)
topics.show(truncate=False)
# Transform documents
transformed = ldaModel.transform(corpus)
Collaborative Filtering
ALS (Alternating Least Squares)
Recommendation system using matrix factorization.Copy
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
# Load ratings data
ratings = spark.read.csv("/data/ratings.csv", header=True, inferSchema=True)
# Split data
train, test = ratings.randomSplit([0.8, 0.2], seed=42)
# Create ALS model
als = ALS(
userCol="userId",
itemCol="movieId",
ratingCol="rating",
coldStartStrategy="drop",
nonnegative=True,
implicitPrefs=False,
rank=10,
maxIter=10,
regParam=0.1,
alpha=1.0
)
# Train model
alsModel = als.fit(train)
# Make predictions
predictions = alsModel.transform(test)
# Evaluate
evaluator = RegressionEvaluator(
metricName="rmse",
labelCol="rating",
predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")
# Generate recommendations
# Top 10 movie recommendations for each user
userRecs = alsModel.recommendForAllUsers(10)
userRecs.show(truncate=False)
# Top 10 user recommendations for each movie
movieRecs = alsModel.recommendForAllItems(10)
movieRecs.show(truncate=False)
# Recommendations for specific users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = alsModel.recommendForUserSubset(users, 10)
userSubsetRecs.show(truncate=False)
Scala ALS
Copy
import org.apache.spark.ml.recommendation.ALS
val ratings = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("/data/ratings.csv")
val Array(train, test) = ratings.randomSplit(Array(0.8, 0.2), seed = 42)
val als = new ALS()
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
.setMaxIter(10)
.setRegParam(0.1)
.setRank(10)
.setColdStartStrategy("drop")
val model = als.fit(train)
// Generate recommendations
val userRecs = model.recommendForAllUsers(10)
val movieRecs = model.recommendForAllItems(10)
Model Evaluation
Classification Metrics
Copy
from pyspark.ml.evaluation import (
BinaryClassificationEvaluator,
MulticlassClassificationEvaluator
)
# Binary classification
binary_evaluator = BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
auc = binary_evaluator.evaluate(predictions)
print(f"AUC: {auc}")
# Change metric
binary_evaluator.setMetricName("areaUnderPR")
aupr = binary_evaluator.evaluate(predictions)
print(f"AUPR: {aupr}")
# Multiclass classification
multi_evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction"
)
# Accuracy
multi_evaluator.setMetricName("accuracy")
accuracy = multi_evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")
# F1 Score
multi_evaluator.setMetricName("f1")
f1 = multi_evaluator.evaluate(predictions)
print(f"F1: {f1}")
# Weighted Precision
multi_evaluator.setMetricName("weightedPrecision")
precision = multi_evaluator.evaluate(predictions)
print(f"Precision: {precision}")
# Weighted Recall
multi_evaluator.setMetricName("weightedRecall")
recall = multi_evaluator.evaluate(predictions)
print(f"Recall: {recall}")
Regression Metrics
Copy
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(
labelCol="label",
predictionCol="prediction"
)
# RMSE
evaluator.setMetricName("rmse")
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")
# MSE
evaluator.setMetricName("mse")
mse = evaluator.evaluate(predictions)
print(f"MSE: {mse}")
# MAE
evaluator.setMetricName("mae")
mae = evaluator.evaluate(predictions)
print(f"MAE: {mae}")
# R2
evaluator.setMetricName("r2")
r2 = evaluator.evaluate(predictions)
print(f"R2: {r2}")
Confusion Matrix
Copy
from pyspark.mllib.evaluation import MulticlassMetrics
# Convert to RDD for mllib metrics
predictionAndLabels = predictions.select("prediction", "label").rdd
metrics = MulticlassMetrics(predictionAndLabels)
# Confusion matrix
print(f"Confusion Matrix:\n{metrics.confusionMatrix()}")
# Overall statistics
print(f"Accuracy: {metrics.accuracy}")
print(f"Weighted Precision: {metrics.weightedPrecision}")
print(f"Weighted Recall: {metrics.weightedRecall}")
print(f"Weighted F1: {metrics.weightedFMeasure()}")
# Per-class metrics
labels = predictions.select("label").distinct().rdd.map(lambda x: x[0]).collect()
for label in sorted(labels):
print(f"\nClass {label}:")
print(f" Precision: {metrics.precision(label)}")
print(f" Recall: {metrics.recall(label)}")
print(f" F1: {metrics.fMeasure(label)}")
Hyperparameter Tuning
Cross-Validation
Copy
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Create model
lr = LogisticRegression(featuresCol="features", labelCol="label")
# Build parameter grid
paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
.addGrid(lr.maxIter, [10, 20, 50]) \
.build()
# Create evaluator
evaluator = BinaryClassificationEvaluator(
labelCol="label",
metricName="areaUnderROC"
)
# Create cross-validator
cv = CrossValidator(
estimator=lr,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=5,
seed=42
)
# Train with cross-validation
cvModel = cv.fit(train)
# Best model
bestModel = cvModel.bestModel
print(f"Best regParam: {bestModel._java_obj.getRegParam()}")
print(f"Best elasticNetParam: {bestModel._java_obj.getElasticNetParam()}")
print(f"Best maxIter: {bestModel._java_obj.getMaxIter()}")
# Average metrics for each parameter combination
avgMetrics = cvModel.avgMetrics
for params, metric in zip(paramGrid, avgMetrics):
print(f"{params} -> {metric}")
# Make predictions with best model
predictions = cvModel.transform(test)
Train-Validation Split
Faster alternative to cross-validation.Copy
from pyspark.ml.tuning import TrainValidationSplit
# Build parameter grid
paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0]) \
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
.build()
# Create train-validation split
tvs = TrainValidationSplit(
estimator=lr,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
trainRatio=0.8,
seed=42
)
# Train
tvsModel = tvs.fit(train)
# Best model
bestModel = tvsModel.bestModel
# Make predictions
predictions = tvsModel.transform(test)
Scala Hyperparameter Tuning
Copy
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.classification.LogisticRegression
val lr = new LogisticRegression()
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.01, 0.1, 0.5))
.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
.addGrid(lr.maxIter, Array(10, 20, 50))
.build()
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setMetricName("areaUnderROC")
val cv = new CrossValidator()
.setEstimator(lr)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(5)
.setSeed(42)
val cvModel = cv.fit(train)
val predictions = cvModel.transform(test)
Model Persistence
Save and Load Models
Copy
# Save pipeline model
model.write().overwrite().save("/models/lr_pipeline")
# Load pipeline model
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("/models/lr_pipeline")
# Save individual model
lrModel.write().overwrite().save("/models/logistic_regression")
# Load individual model
from pyspark.ml.classification import LogisticRegressionModel
loaded_lr = LogisticRegressionModel.load("/models/logistic_regression")
# Make predictions with loaded model
predictions = loaded_model.transform(test)
Scala Model Persistence
Copy
// Save model
model.write.overwrite().save("/models/lr_pipeline")
// Load model
import org.apache.spark.ml.PipelineModel
val loadedModel = PipelineModel.load("/models/lr_pipeline")
// Use loaded model
val predictions = loadedModel.transform(test)
Real-World Use Cases
Credit Risk Prediction
Copy
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Load credit data
credit = spark.read.csv("/data/credit.csv", header=True, inferSchema=True)
# Feature engineering
categorical_cols = ["employment_status", "loan_purpose", "home_ownership"]
numeric_cols = ["age", "income", "loan_amount", "credit_score", "debt_to_income"]
# Index categorical variables
indexers = [
StringIndexer(inputCol=col, outputCol=col+"_index")
for col in categorical_cols
]
# Assemble features
assembler = VectorAssembler(
inputCols=[col+"_index" for col in categorical_cols] + numeric_cols,
outputCol="features"
)
# Scale features
scaler = StandardScaler(
inputCol="features",
outputCol="scaledFeatures"
)
# Index label
labelIndexer = StringIndexer(inputCol="default", outputCol="label")
# Random Forest classifier
rf = RandomForestClassifier(
featuresCol="scaledFeatures",
labelCol="label",
numTrees=100,
maxDepth=10,
seed=42
)
# Create pipeline
pipeline = Pipeline(stages=indexers + [assembler, scaler, labelIndexer, rf])
# Split data
train, test = credit.randomSplit([0.8, 0.2], seed=42)
# Train model
model = pipeline.fit(train)
# Evaluate
predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")
# Feature importance
rfModel = model.stages[-1]
feature_names = [col+"_index" for col in categorical_cols] + numeric_cols
importances = list(zip(feature_names, rfModel.featureImportances.toArray()))
sorted_importances = sorted(importances, key=lambda x: x[1], reverse=True)
print("\nTop 10 Important Features:")
for feature, importance in sorted_importances[:10]:
print(f"{feature}: {importance:.4f}")
Customer Segmentation
Copy
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler
# Load customer data
customers = spark.read.csv("/data/customers.csv", header=True, inferSchema=True)
# Select features
feature_cols = [
"recency", "frequency", "monetary_value",
"avg_order_value", "customer_lifetime_value"
]
# Assemble and scale
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
# Prepare data
features_df = assembler.transform(customers)
scaled_df = scaler.fit(features_df).transform(features_df)
# Find optimal k using elbow method
wssses = []
for k in range(2, 11):
kmeans = KMeans(featuresCol="scaledFeatures", k=k, seed=42)
model = kmeans.fit(scaled_df)
wssse = model.summary.trainingCost
wssses.append((k, wssse))
print(f"k={k}, WSSSE={wssse}")
# Train final model with chosen k
kmeans = KMeans(featuresCol="scaledFeatures", k=5, seed=42)
kmeansModel = kmeans.fit(scaled_df)
# Get cluster assignments
clustered = kmeansModel.transform(scaled_df)
# Analyze clusters
cluster_stats = clustered.groupBy("prediction").agg(
*[avg(col).alias(f"avg_{col}") for col in feature_cols],
count("*").alias("count")
)
cluster_stats.show()
Movie Recommendation System
Copy
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, explode
# Load ratings
ratings = spark.read.csv("/data/ratings.csv", header=True, inferSchema=True)
movies = spark.read.csv("/data/movies.csv", header=True, inferSchema=True)
# Split data
train, test = ratings.randomSplit([0.8, 0.2], seed=42)
# Build ALS model
als = ALS(
userCol="userId",
itemCol="movieId",
ratingCol="rating",
coldStartStrategy="drop",
rank=10,
maxIter=10,
regParam=0.1
)
# Train
model = als.fit(train)
# Evaluate
predictions = model.transform(test)
evaluator = RegressionEvaluator(
metricName="rmse",
labelCol="rating",
predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")
# Generate top 10 recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Show recommendations with movie titles
user_recs_exploded = userRecs.select(
"userId",
explode("recommendations").alias("rec")
).select(
"userId",
col("rec.movieId").alias("movieId"),
col("rec.rating").alias("predicted_rating")
).join(movies, "movieId")
user_recs_exploded.filter(col("userId") == 1).show(10, truncate=False)
Churn Prediction
Copy
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer
# Load customer data
customers = spark.read.csv("/data/customer_churn.csv", header=True, inferSchema=True)
# Feature columns
feature_cols = [
"tenure", "monthly_charges", "total_charges",
"num_services", "customer_service_calls",
"contract_type_index", "payment_method_index"
]
# Index categorical features
contract_indexer = StringIndexer(
inputCol="contract_type",
outputCol="contract_type_index"
)
payment_indexer = StringIndexer(
inputCol="payment_method",
outputCol="payment_method_index"
)
# Assemble features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Index label
label_indexer = StringIndexer(inputCol="churn", outputCol="label")
# GBT Classifier
gbt = GBTClassifier(
featuresCol="features",
labelCol="label",
maxIter=20,
maxDepth=5
)
# Pipeline
pipeline = Pipeline(stages=[
contract_indexer,
payment_indexer,
assembler,
label_indexer,
gbt
])
# Train
train, test = customers.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train)
# Predict
predictions = model.transform(test)
# Evaluate
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")
# Show high-risk customers
high_risk = predictions.filter(col("prediction") == 1.0) \
.select("customer_id", "probability", "prediction") \
.orderBy(col("probability").desc())
high_risk.show(20)
Performance Tips
Data Caching
Copy
# Cache training data for iterative algorithms
train.cache()
model = rf.fit(train)
train.unpersist()
Partition Tuning
Copy
# Repartition for better parallelism
df = df.repartition(200)
# Coalesce to reduce partitions after filtering
filtered = df.filter(col("active") == True).coalesce(50)
Broadcast Variables
Copy
from pyspark.sql.functions import broadcast
# Broadcast small dimension tables in joins
enriched = transactions.join(broadcast(products), "product_id")
Feature Selection
Copy
# Reduce feature dimensions with PCA
from pyspark.ml.feature import PCA
pca = PCA(k=50, inputCol="features", outputCol="pcaFeatures")
reduced = pca.fit(df).transform(df)
Sampling for Development
Copy
# Use sampling for faster iteration during development
sample = df.sample(fraction=0.1, seed=42)
model = pipeline.fit(sample)
Common Pitfalls
Issue 1: Imbalanced Classes
Problem: Model biased toward majority class. Solution:Copy
# Calculate class weights
total = train.count()
positive = train.filter(col("label") == 1).count()
negative = train.filter(col("label") == 0).count()
weight_positive = total / (2.0 * positive)
weight_negative = total / (2.0 * negative)
# Add weight column
from pyspark.sql.functions import when
weighted = train.withColumn(
"weight",
when(col("label") == 1, weight_positive).otherwise(weight_negative)
)
# Use weighted training
lr = LogisticRegression(weightCol="weight")
Issue 2: Data Leakage
Problem: Using future information in features. Solution:Copy
# Split data by time, not randomly
train = df.filter(col("date") < "2023-01-01")
test = df.filter(col("date") >= "2023-01-01")
# Fit transformers only on training data
scaler = StandardScaler(inputCol="features", outputCol="scaled")
scaler_model = scaler.fit(train) # Fit only on train
train_scaled = scaler_model.transform(train)
test_scaled = scaler_model.transform(test)
Issue 3: Missing Values
Problem: Algorithms fail on null values. Solution:Copy
from pyspark.ml.feature import Imputer
imputer = Imputer(
inputCols=["age", "income"],
outputCols=["age_imputed", "income_imputed"],
strategy="mean" # or "median", "mode"
)
imputed = imputer.fit(df).transform(df)
Issue 4: Categorical Variables
Problem: Not encoding categorical features. Solution:Copy
# Always index and encode categoricals
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
encoder = OneHotEncoder(inputCols=["categoryIndex"], outputCols=["categoryVec"])
# Or use VectorIndexer for automatic handling
from pyspark.ml.feature import VectorIndexer
indexer = VectorIndexer(
inputCol="features",
outputCol="indexed",
maxCategories=10 # Features with <= 10 distinct values are categorical
)
Hands-On Exercises
Exercise 1: Build a Classification Pipeline
Create a complete pipeline for binary classification.Copy
# TODO: Implement classification pipeline
# 1. Load and explore data
# 2. Handle missing values
# 3. Encode categorical variables
# 4. Scale numerical features
# 5. Train multiple models
# 6. Compare performance
# 7. Tune hyperparameters
# Your code here
Exercise 2: Customer Segmentation
Perform clustering analysis on customer data.Copy
# TODO: Customer segmentation
# 1. Load customer transaction data
# 2. Engineer RFM features
# 3. Find optimal number of clusters
# 4. Train K-means model
# 5. Analyze cluster characteristics
# 6. Visualize results
# Your code here
Exercise 3: Recommendation System
Build a movie recommendation system.Copy
# TODO: Build recommender
# 1. Load ratings data
# 2. Train ALS model
# 3. Tune hyperparameters
# 4. Generate recommendations
# 5. Evaluate with RMSE
# 6. Handle cold-start problem
# Your code here
Summary
MLlib provides a comprehensive suite of distributed machine learning algorithms:- Pipelines: Streamline ML workflows with transformers and estimators
- Feature Engineering: Rich set of transformers for data preparation
- Algorithms: Classification, regression, clustering, and recommendations
- Evaluation: Comprehensive metrics for model assessment
- Tuning: Cross-validation and parameter grid search
- Scalability: Handle datasets too large for single machines
Key Takeaways
- Always use pipelines for reproducible ML workflows
- Properly handle categorical variables with indexing and encoding
- Scale features for better algorithm performance
- Use cross-validation for robust hyperparameter tuning
- Cache data for iterative algorithms
- Monitor and address class imbalance
- Prevent data leakage with proper train/test splits
Next Steps
- Practice building end-to-end ML pipelines
- Experiment with hyperparameter tuning
- Explore feature engineering techniques
- Learn production model deployment
- Study advanced ensemble methods
Continue to the next module to master performance tuning and optimization techniques.