> ## Documentation Index
> Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt
> Use this file to discover all available pages before exploring further.

# MLlib for Machine Learning

> Distributed machine learning at scale

# MLlib for Machine Learning

<Info>
  **Module Duration**: 4-5 hours
  **Focus**: Distributed machine learning with MLlib
  **Prerequisites**: Spark SQL, DataFrames, and basic ML concepts
</Info>

## Overview

MLlib is Apache Spark's scalable machine learning library built on top of Spark Core. It provides high-quality algorithms for classification, regression, clustering, collaborative filtering, and more, all designed to scale horizontally across clusters.

### Key Features

**Scalability**: Train models on datasets too large for single machines.

**Speed**: Leverage Spark's in-memory computing for fast iterations.

**Ease of Use**: High-level APIs in Scala, Java, Python, and R.

**Integration**: Seamlessly integrate with Spark SQL, Streaming, and GraphX.

## ML Pipelines

### Pipeline Concepts

A pipeline chains multiple transformers and estimators to specify an ML workflow.

**Transformer**: Algorithm that transforms a DataFrame (e.g., feature extraction, model).

**Estimator**: Algorithm that fits on a DataFrame to produce a Transformer (e.g., learning algorithm).

**Pipeline**: Chains multiple stages of transformers and estimators.

**Parameter**: Common API for specifying parameters.

### Basic Pipeline Example

```python theme={null}
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MLPipeline").getOrCreate()

# Load data
data = spark.read.format("libsvm").load("/data/sample_libsvm_data.txt")

# Split data
train, test = data.randomSplit([0.8, 0.2], seed=42)

# Define pipeline stages
assembler = VectorAssembler(
    inputCols=["feature1", "feature2", "feature3"],
    outputCol="features"
)

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    withStd=True,
    withMean=True
)

lr = LogisticRegression(
    featuresCol="scaledFeatures",
    labelCol="label",
    maxIter=10
)

# Create pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

# Train model
model = pipeline.fit(train)

# Make predictions
predictions = model.transform(test)
predictions.select("prediction", "label", "probability").show()
```

### Scala Pipeline Example

```scala theme={null}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("MLPipeline")
  .getOrCreate()

// Load data
val data = spark.read.format("libsvm")
  .load("/data/sample_libsvm_data.txt")

// Split data
val Array(train, test) = data.randomSplit(Array(0.8, 0.2), seed = 42)

// Define pipeline stages
val assembler = new VectorAssembler()
  .setInputCols(Array("feature1", "feature2", "feature3"))
  .setOutputCol("features")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(true)

val lr = new LogisticRegression()
  .setFeaturesCol("scaledFeatures")
  .setLabelCol("label")
  .setMaxIter(10)

// Create and train pipeline
val pipeline = new Pipeline()
  .setStages(Array(assembler, scaler, lr))

val model = pipeline.fit(train)

// Make predictions
val predictions = model.transform(test)
predictions.select("prediction", "label", "probability").show()
```

## Feature Engineering

### VectorAssembler

Combine multiple columns into a feature vector.

```python theme={null}
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["age", "income", "credit_score"],
    outputCol="features"
)

df_with_features = assembler.transform(df)
```

### StringIndexer

Convert string labels to numeric indices.

```python theme={null}
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(
    inputCol="category",
    outputCol="categoryIndex"
)

indexed = indexer.fit(df).transform(df)
```

### OneHotEncoder

Convert categorical indices to binary vectors.

```python theme={null}
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(
    inputCols=["categoryIndex"],
    outputCols=["categoryVec"]
)

encoded = encoder.fit(indexed).transform(indexed)
```

### StandardScaler

Standardize features by removing mean and scaling to unit variance.

```python theme={null}
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    withStd=True,
    withMean=True
)

scaled = scaler.fit(df).transform(df)
```

### MinMaxScaler

Scale features to a specific range (default \[0, 1]).

```python theme={null}
from pyspark.ml.feature import MinMaxScaler

mmScaler = MinMaxScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    min=0.0,
    max=1.0
)

scaled = mmScaler.fit(df).transform(df)
```

### Tokenizer and HashingTF

Process text data for ML.

```python theme={null}
from pyspark.ml.feature import Tokenizer, HashingTF, IDF

# Tokenize text
tokenizer = Tokenizer(inputCol="text", outputCol="words")
words = tokenizer.transform(df)

# Create term frequency features
hashingTF = HashingTF(
    inputCol="words",
    outputCol="rawFeatures",
    numFeatures=1000
)
featurized = hashingTF.transform(words)

# Apply IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurized)
rescaled = idfModel.transform(featurized)
```

### PCA (Principal Component Analysis)

Dimensionality reduction.

```python theme={null}
from pyspark.ml.feature import PCA

pca = PCA(
    k=3,  # Number of principal components
    inputCol="features",
    outputCol="pcaFeatures"
)

pcaModel = pca.fit(df)
result = pcaModel.transform(df)
```

### Scala Feature Engineering

```scala theme={null}
import org.apache.spark.ml.feature._

// VectorAssembler
val assembler = new VectorAssembler()
  .setInputCols(Array("age", "income", "credit_score"))
  .setOutputCol("features")

// StringIndexer
val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")

// OneHotEncoder
val encoder = new OneHotEncoder()
  .setInputCols(Array("categoryIndex"))
  .setOutputCols(Array("categoryVec"))

// StandardScaler
val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(true)

// PCA
val pca = new PCA()
  .setK(3)
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
```

## Classification

### Logistic Regression

Binary and multiclass classification.

```python theme={null}
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create classifier
lr = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=10,
    regParam=0.01,
    elasticNetParam=0.8
)

# Train model
lrModel = lr.fit(train)

# Model coefficients and intercept
print(f"Coefficients: {lrModel.coefficients}")
print(f"Intercept: {lrModel.intercept}")

# Make predictions
predictions = lrModel.transform(test)

# Evaluate
evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")
```

### Decision Trees

```python theme={null}
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(
    featuresCol="features",
    labelCol="label",
    maxDepth=5,
    maxBins=32,
    impurity="gini"
)

dtModel = dt.fit(train)

# Feature importance
print(f"Feature Importance: {dtModel.featureImportances}")

# Tree structure
print(f"Tree:\n{dtModel.toDebugString}")
```

### Random Forest

```python theme={null}
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=100,
    maxDepth=10,
    maxBins=32,
    seed=42
)

rfModel = rf.fit(train)

# Get predictions
predictions = rfModel.transform(test)

# Feature importance
print(f"Feature Importance: {rfModel.featureImportances}")
```

### Gradient-Boosted Trees

```python theme={null}
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(
    featuresCol="features",
    labelCol="label",
    maxIter=10,
    maxDepth=5,
    stepSize=0.1
)

gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)
```

### Naive Bayes

```python theme={null}
from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(
    featuresCol="features",
    labelCol="label",
    smoothing=1.0,
    modelType="multinomial"
)

nbModel = nb.fit(train)
predictions = nbModel.transform(test)
```

### Multilayer Perceptron (Neural Network)

```python theme={null}
from pyspark.ml.classification import MultilayerPerceptronClassifier

# Specify layers: input, hidden layers, output
layers = [4, 5, 4, 3]  # 4 features, 2 hidden layers, 3 classes

mlp = MultilayerPerceptronClassifier(
    featuresCol="features",
    labelCol="label",
    layers=layers,
    blockSize=128,
    seed=42,
    maxIter=100
)

mlpModel = mlp.fit(train)
predictions = mlpModel.transform(test)
```

### Scala Classification

```scala theme={null}
import org.apache.spark.ml.classification._

// Logistic Regression
val lr = new LogisticRegression()
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setMaxIter(10)
  .setRegParam(0.01)

// Random Forest
val rf = new RandomForestClassifier()
  .setNumTrees(100)
  .setMaxDepth(10)
  .setFeaturesCol("features")
  .setLabelCol("label")

// GBT
val gbt = new GBTClassifier()
  .setMaxIter(10)
  .setMaxDepth(5)
  .setFeaturesCol("features")
  .setLabelCol("label")
```

## Regression

### Linear Regression

```python theme={null}
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Create regressor
lr = LinearRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=10,
    regParam=0.3,
    elasticNetParam=0.8
)

# Train model
lrModel = lr.fit(train)

# Print coefficients and intercept
print(f"Coefficients: {lrModel.coefficients}")
print(f"Intercept: {lrModel.intercept}")

# Training summary
trainingSummary = lrModel.summary
print(f"RMSE: {trainingSummary.rootMeanSquaredError}")
print(f"R2: {trainingSummary.r2}")
print(f"MAE: {trainingSummary.meanAbsoluteError}")

# Make predictions
predictions = lrModel.transform(test)

# Evaluate
evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = evaluator.evaluate(predictions)
print(f"Test RMSE: {rmse}")
```

### Decision Tree Regression

```python theme={null}
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(
    featuresCol="features",
    labelCol="label",
    maxDepth=5,
    maxBins=32
)

dtModel = dt.fit(train)
predictions = dtModel.transform(test)
```

### Random Forest Regression

```python theme={null}
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="label",
    numTrees=100,
    maxDepth=10,
    seed=42
)

rfModel = rf.fit(train)
predictions = rfModel.transform(test)

# Feature importance
print(f"Feature Importance: {rfModel.featureImportances}")
```

### Gradient-Boosted Trees Regression

```python theme={null}
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(
    featuresCol="features",
    labelCol="label",
    maxIter=10,
    maxDepth=5,
    stepSize=0.1
)

gbtModel = gbt.fit(train)
predictions = gbtModel.transform(test)
```

### Generalized Linear Regression

```python theme={null}
from pyspark.ml.regression import GeneralizedLinearRegression

glr = GeneralizedLinearRegression(
    featuresCol="features",
    labelCol="label",
    family="gaussian",
    link="identity",
    maxIter=10,
    regParam=0.3
)

glrModel = glr.fit(train)
predictions = glrModel.transform(test)

# Summary
summary = glrModel.summary
print(f"Coefficient Standard Errors: {summary.coefficientStandardErrors}")
print(f"T Values: {summary.tValues}")
print(f"P Values: {summary.pValues}")
```

## Clustering

### K-Means

```python theme={null}
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Create clusterer
kmeans = KMeans(
    featuresCol="features",
    k=3,
    seed=42,
    maxIter=20
)

# Train model
kmeansModel = kmeans.fit(df)

# Get cluster centers
centers = kmeansModel.clusterCenters()
print("Cluster Centers:")
for center in centers:
    print(center)

# Make predictions
predictions = kmeansModel.transform(df)

# Evaluate clustering
evaluator = ClusteringEvaluator(
    featuresCol="features",
    predictionCol="prediction",
    metricName="silhouette"
)

silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette}")

# Within Set Sum of Squared Errors
wssse = kmeansModel.summary.trainingCost
print(f"WSSSE: {wssse}")
```

### Bisecting K-Means

```python theme={null}
from pyspark.ml.clustering import BisectingKMeans

bkm = BisectingKMeans(
    featuresCol="features",
    k=5,
    maxIter=20,
    seed=42
)

bkmModel = bkm.fit(df)
predictions = bkmModel.transform(df)

# Cluster centers
centers = bkmModel.clusterCenters()
```

### Gaussian Mixture Model

```python theme={null}
from pyspark.ml.clustering import GaussianMixture

gmm = GaussianMixture(
    featuresCol="features",
    k=3,
    maxIter=20,
    seed=42
)

gmmModel = gmm.fit(df)

# Get predictions with probabilities
predictions = gmmModel.transform(df)
predictions.select("features", "prediction", "probability").show()

# Gaussians
print(f"Gaussians:\n{gmmModel.gaussiansDF.show()}")
```

### LDA (Latent Dirichlet Allocation)

Topic modeling for text.

```python theme={null}
from pyspark.ml.clustering import LDA

lda = LDA(
    featuresCol="features",
    k=10,  # Number of topics
    maxIter=10,
    seed=42
)

ldaModel = lda.fit(corpus)

# Topics
topics = ldaModel.describeTopics(maxTermsPerTopic=10)
topics.show(truncate=False)

# Transform documents
transformed = ldaModel.transform(corpus)
```

## Collaborative Filtering

### ALS (Alternating Least Squares)

Recommendation system using matrix factorization.

```python theme={null}
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Load ratings data
ratings = spark.read.csv("/data/ratings.csv", header=True, inferSchema=True)

# Split data
train, test = ratings.randomSplit([0.8, 0.2], seed=42)

# Create ALS model
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,
    implicitPrefs=False,
    rank=10,
    maxIter=10,
    regParam=0.1,
    alpha=1.0
)

# Train model
alsModel = als.fit(train)

# Make predictions
predictions = alsModel.transform(test)

# Evaluate
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

# Generate recommendations
# Top 10 movie recommendations for each user
userRecs = alsModel.recommendForAllUsers(10)
userRecs.show(truncate=False)

# Top 10 user recommendations for each movie
movieRecs = alsModel.recommendForAllItems(10)
movieRecs.show(truncate=False)

# Recommendations for specific users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = alsModel.recommendForUserSubset(users, 10)
userSubsetRecs.show(truncate=False)
```

### Scala ALS

```scala theme={null}
import org.apache.spark.ml.recommendation.ALS

val ratings = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("/data/ratings.csv")

val Array(train, test) = ratings.randomSplit(Array(0.8, 0.2), seed = 42)

val als = new ALS()
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")
  .setMaxIter(10)
  .setRegParam(0.1)
  .setRank(10)
  .setColdStartStrategy("drop")

val model = als.fit(train)

// Generate recommendations
val userRecs = model.recommendForAllUsers(10)
val movieRecs = model.recommendForAllItems(10)
```

## Model Evaluation

### Classification Metrics

```python theme={null}
from pyspark.ml.evaluation import (
    BinaryClassificationEvaluator,
    MulticlassClassificationEvaluator
)

# Binary classification
binary_evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = binary_evaluator.evaluate(predictions)
print(f"AUC: {auc}")

# Change metric
binary_evaluator.setMetricName("areaUnderPR")
aupr = binary_evaluator.evaluate(predictions)
print(f"AUPR: {aupr}")

# Multiclass classification
multi_evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction"
)

# Accuracy
multi_evaluator.setMetricName("accuracy")
accuracy = multi_evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# F1 Score
multi_evaluator.setMetricName("f1")
f1 = multi_evaluator.evaluate(predictions)
print(f"F1: {f1}")

# Weighted Precision
multi_evaluator.setMetricName("weightedPrecision")
precision = multi_evaluator.evaluate(predictions)
print(f"Precision: {precision}")

# Weighted Recall
multi_evaluator.setMetricName("weightedRecall")
recall = multi_evaluator.evaluate(predictions)
print(f"Recall: {recall}")
```

### Regression Metrics

```python theme={null}
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction"
)

# RMSE
evaluator.setMetricName("rmse")
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

# MSE
evaluator.setMetricName("mse")
mse = evaluator.evaluate(predictions)
print(f"MSE: {mse}")

# MAE
evaluator.setMetricName("mae")
mae = evaluator.evaluate(predictions)
print(f"MAE: {mae}")

# R2
evaluator.setMetricName("r2")
r2 = evaluator.evaluate(predictions)
print(f"R2: {r2}")
```

### Confusion Matrix

```python theme={null}
from pyspark.mllib.evaluation import MulticlassMetrics

# Convert to RDD for mllib metrics
predictionAndLabels = predictions.select("prediction", "label").rdd

metrics = MulticlassMetrics(predictionAndLabels)

# Confusion matrix
print(f"Confusion Matrix:\n{metrics.confusionMatrix()}")

# Overall statistics
print(f"Accuracy: {metrics.accuracy}")
print(f"Weighted Precision: {metrics.weightedPrecision}")
print(f"Weighted Recall: {metrics.weightedRecall}")
print(f"Weighted F1: {metrics.weightedFMeasure()}")

# Per-class metrics
labels = predictions.select("label").distinct().rdd.map(lambda x: x[0]).collect()
for label in sorted(labels):
    print(f"\nClass {label}:")
    print(f"  Precision: {metrics.precision(label)}")
    print(f"  Recall: {metrics.recall(label)}")
    print(f"  F1: {metrics.fMeasure(label)}")
```

## Hyperparameter Tuning

### Cross-Validation

```python theme={null}
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create model
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Build parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .addGrid(lr.maxIter, [10, 20, 50]) \
    .build()

# Create evaluator
evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    metricName="areaUnderROC"
)

# Create cross-validator
cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5,
    seed=42
)

# Train with cross-validation
cvModel = cv.fit(train)

# Best model
bestModel = cvModel.bestModel
print(f"Best regParam: {bestModel._java_obj.getRegParam()}")
print(f"Best elasticNetParam: {bestModel._java_obj.getElasticNetParam()}")
print(f"Best maxIter: {bestModel._java_obj.getMaxIter()}")

# Average metrics for each parameter combination
avgMetrics = cvModel.avgMetrics
for params, metric in zip(paramGrid, avgMetrics):
    print(f"{params} -> {metric}")

# Make predictions with best model
predictions = cvModel.transform(test)
```

### Train-Validation Split

Faster alternative to cross-validation.

```python theme={null}
from pyspark.ml.tuning import TrainValidationSplit

# Build parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Create train-validation split
tvs = TrainValidationSplit(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    trainRatio=0.8,
    seed=42
)

# Train
tvsModel = tvs.fit(train)

# Best model
bestModel = tvsModel.bestModel

# Make predictions
predictions = tvsModel.transform(test)
```

### Scala Hyperparameter Tuning

```scala theme={null}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.classification.LogisticRegression

val lr = new LogisticRegression()

val paramGrid = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(0.01, 0.1, 0.5))
  .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
  .addGrid(lr.maxIter, Array(10, 20, 50))
  .build()

val evaluator = new BinaryClassificationEvaluator()
  .setLabelCol("label")
  .setMetricName("areaUnderROC")

val cv = new CrossValidator()
  .setEstimator(lr)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(5)
  .setSeed(42)

val cvModel = cv.fit(train)
val predictions = cvModel.transform(test)
```

## Model Persistence

### Save and Load Models

```python theme={null}
# Save pipeline model
model.write().overwrite().save("/models/lr_pipeline")

# Load pipeline model
from pyspark.ml import PipelineModel
loaded_model = PipelineModel.load("/models/lr_pipeline")

# Save individual model
lrModel.write().overwrite().save("/models/logistic_regression")

# Load individual model
from pyspark.ml.classification import LogisticRegressionModel
loaded_lr = LogisticRegressionModel.load("/models/logistic_regression")

# Make predictions with loaded model
predictions = loaded_model.transform(test)
```

### Scala Model Persistence

```scala theme={null}
// Save model
model.write.overwrite().save("/models/lr_pipeline")

// Load model
import org.apache.spark.ml.PipelineModel
val loadedModel = PipelineModel.load("/models/lr_pipeline")

// Use loaded model
val predictions = loadedModel.transform(test)
```

## Real-World Use Cases

### Credit Risk Prediction

```python theme={null}
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Load credit data
credit = spark.read.csv("/data/credit.csv", header=True, inferSchema=True)

# Feature engineering
categorical_cols = ["employment_status", "loan_purpose", "home_ownership"]
numeric_cols = ["age", "income", "loan_amount", "credit_score", "debt_to_income"]

# Index categorical variables
indexers = [
    StringIndexer(inputCol=col, outputCol=col+"_index")
    for col in categorical_cols
]

# Assemble features
assembler = VectorAssembler(
    inputCols=[col+"_index" for col in categorical_cols] + numeric_cols,
    outputCol="features"
)

# Scale features
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures"
)

# Index label
labelIndexer = StringIndexer(inputCol="default", outputCol="label")

# Random Forest classifier
rf = RandomForestClassifier(
    featuresCol="scaledFeatures",
    labelCol="label",
    numTrees=100,
    maxDepth=10,
    seed=42
)

# Create pipeline
pipeline = Pipeline(stages=indexers + [assembler, scaler, labelIndexer, rf])

# Split data
train, test = credit.randomSplit([0.8, 0.2], seed=42)

# Train model
model = pipeline.fit(train)

# Evaluate
predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")

# Feature importance
rfModel = model.stages[-1]
feature_names = [col+"_index" for col in categorical_cols] + numeric_cols
importances = list(zip(feature_names, rfModel.featureImportances.toArray()))
sorted_importances = sorted(importances, key=lambda x: x[1], reverse=True)

print("\nTop 10 Important Features:")
for feature, importance in sorted_importances[:10]:
    print(f"{feature}: {importance:.4f}")
```

### Customer Segmentation

```python theme={null}
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Load customer data
customers = spark.read.csv("/data/customers.csv", header=True, inferSchema=True)

# Select features
feature_cols = [
    "recency", "frequency", "monetary_value",
    "avg_order_value", "customer_lifetime_value"
]

# Assemble and scale
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

# Prepare data
features_df = assembler.transform(customers)
scaled_df = scaler.fit(features_df).transform(features_df)

# Find optimal k using elbow method
wssses = []
for k in range(2, 11):
    kmeans = KMeans(featuresCol="scaledFeatures", k=k, seed=42)
    model = kmeans.fit(scaled_df)
    wssse = model.summary.trainingCost
    wssses.append((k, wssse))
    print(f"k={k}, WSSSE={wssse}")

# Train final model with chosen k
kmeans = KMeans(featuresCol="scaledFeatures", k=5, seed=42)
kmeansModel = kmeans.fit(scaled_df)

# Get cluster assignments
clustered = kmeansModel.transform(scaled_df)

# Analyze clusters
cluster_stats = clustered.groupBy("prediction").agg(
    *[avg(col).alias(f"avg_{col}") for col in feature_cols],
    count("*").alias("count")
)
cluster_stats.show()
```

### Movie Recommendation System

```python theme={null}
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, explode

# Load ratings
ratings = spark.read.csv("/data/ratings.csv", header=True, inferSchema=True)
movies = spark.read.csv("/data/movies.csv", header=True, inferSchema=True)

# Split data
train, test = ratings.randomSplit([0.8, 0.2], seed=42)

# Build ALS model
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    rank=10,
    maxIter=10,
    regParam=0.1
)

# Train
model = als.fit(train)

# Evaluate
predictions = model.transform(test)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

# Generate top 10 recommendations for each user
userRecs = model.recommendForAllUsers(10)

# Show recommendations with movie titles
user_recs_exploded = userRecs.select(
    "userId",
    explode("recommendations").alias("rec")
).select(
    "userId",
    col("rec.movieId").alias("movieId"),
    col("rec.rating").alias("predicted_rating")
).join(movies, "movieId")

user_recs_exploded.filter(col("userId") == 1).show(10, truncate=False)
```

### Churn Prediction

```python theme={null}
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer

# Load customer data
customers = spark.read.csv("/data/customer_churn.csv", header=True, inferSchema=True)

# Feature columns
feature_cols = [
    "tenure", "monthly_charges", "total_charges",
    "num_services", "customer_service_calls",
    "contract_type_index", "payment_method_index"
]

# Index categorical features
contract_indexer = StringIndexer(
    inputCol="contract_type",
    outputCol="contract_type_index"
)
payment_indexer = StringIndexer(
    inputCol="payment_method",
    outputCol="payment_method_index"
)

# Assemble features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Index label
label_indexer = StringIndexer(inputCol="churn", outputCol="label")

# GBT Classifier
gbt = GBTClassifier(
    featuresCol="features",
    labelCol="label",
    maxIter=20,
    maxDepth=5
)

# Pipeline
pipeline = Pipeline(stages=[
    contract_indexer,
    payment_indexer,
    assembler,
    label_indexer,
    gbt
])

# Train
train, test = customers.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train)

# Predict
predictions = model.transform(test)

# Evaluate
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")

# Show high-risk customers
high_risk = predictions.filter(col("prediction") == 1.0) \
    .select("customer_id", "probability", "prediction") \
    .orderBy(col("probability").desc())
high_risk.show(20)
```

## Performance Tips

### Data Caching

```python theme={null}
# Cache training data for iterative algorithms
train.cache()
model = rf.fit(train)
train.unpersist()
```

### Partition Tuning

```python theme={null}
# Repartition for better parallelism
df = df.repartition(200)

# Coalesce to reduce partitions after filtering
filtered = df.filter(col("active") == True).coalesce(50)
```

### Broadcast Variables

```python theme={null}
from pyspark.sql.functions import broadcast

# Broadcast small dimension tables in joins
enriched = transactions.join(broadcast(products), "product_id")
```

### Feature Selection

```python theme={null}
# Reduce feature dimensions with PCA
from pyspark.ml.feature import PCA

pca = PCA(k=50, inputCol="features", outputCol="pcaFeatures")
reduced = pca.fit(df).transform(df)
```

### Sampling for Development

```python theme={null}
# Use sampling for faster iteration during development
sample = df.sample(fraction=0.1, seed=42)
model = pipeline.fit(sample)
```

## Common Pitfalls

### Issue 1: Imbalanced Classes

**Problem**: Model biased toward majority class.

**Solution**:

```python theme={null}
# Calculate class weights
total = train.count()
positive = train.filter(col("label") == 1).count()
negative = train.filter(col("label") == 0).count()

weight_positive = total / (2.0 * positive)
weight_negative = total / (2.0 * negative)

# Add weight column
from pyspark.sql.functions import when
weighted = train.withColumn(
    "weight",
    when(col("label") == 1, weight_positive).otherwise(weight_negative)
)

# Use weighted training
lr = LogisticRegression(weightCol="weight")
```

### Issue 2: Data Leakage

**Problem**: Using future information in features.

**Solution**:

```python theme={null}
# Split data by time, not randomly
train = df.filter(col("date") < "2023-01-01")
test = df.filter(col("date") >= "2023-01-01")

# Fit transformers only on training data
scaler = StandardScaler(inputCol="features", outputCol="scaled")
scaler_model = scaler.fit(train)  # Fit only on train
train_scaled = scaler_model.transform(train)
test_scaled = scaler_model.transform(test)
```

### Issue 3: Missing Values

**Problem**: Algorithms fail on null values.

**Solution**:

```python theme={null}
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=["age", "income"],
    outputCols=["age_imputed", "income_imputed"],
    strategy="mean"  # or "median", "mode"
)

imputed = imputer.fit(df).transform(df)
```

### Issue 4: Categorical Variables

**Problem**: Not encoding categorical features.

**Solution**:

```python theme={null}
# Always index and encode categoricals
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
encoder = OneHotEncoder(inputCols=["categoryIndex"], outputCols=["categoryVec"])

# Or use VectorIndexer for automatic handling
from pyspark.ml.feature import VectorIndexer

indexer = VectorIndexer(
    inputCol="features",
    outputCol="indexed",
    maxCategories=10  # Features with <= 10 distinct values are categorical
)
```

## Hands-On Exercises

### Exercise 1: Build a Classification Pipeline

Create a complete pipeline for binary classification.

```python theme={null}
# TODO: Implement classification pipeline
# 1. Load and explore data
# 2. Handle missing values
# 3. Encode categorical variables
# 4. Scale numerical features
# 5. Train multiple models
# 6. Compare performance
# 7. Tune hyperparameters

# Your code here
```

### Exercise 2: Customer Segmentation

Perform clustering analysis on customer data.

```python theme={null}
# TODO: Customer segmentation
# 1. Load customer transaction data
# 2. Engineer RFM features
# 3. Find optimal number of clusters
# 4. Train K-means model
# 5. Analyze cluster characteristics
# 6. Visualize results

# Your code here
```

### Exercise 3: Recommendation System

Build a movie recommendation system.

```python theme={null}
# TODO: Build recommender
# 1. Load ratings data
# 2. Train ALS model
# 3. Tune hyperparameters
# 4. Generate recommendations
# 5. Evaluate with RMSE
# 6. Handle cold-start problem

# Your code here
```

## Summary

MLlib provides a comprehensive suite of distributed machine learning algorithms:

* **Pipelines**: Streamline ML workflows with transformers and estimators
* **Feature Engineering**: Rich set of transformers for data preparation
* **Algorithms**: Classification, regression, clustering, and recommendations
* **Evaluation**: Comprehensive metrics for model assessment
* **Tuning**: Cross-validation and parameter grid search
* **Scalability**: Handle datasets too large for single machines

### Key Takeaways

1. Always use pipelines for reproducible ML workflows
2. Properly handle categorical variables with indexing and encoding
3. Scale features for better algorithm performance
4. Use cross-validation for robust hyperparameter tuning
5. Cache data for iterative algorithms
6. Monitor and address class imbalance
7. Prevent data leakage with proper train/test splits

### Next Steps

* Practice building end-to-end ML pipelines
* Experiment with hyperparameter tuning
* Explore feature engineering techniques
* Learn production model deployment
* Study advanced ensemble methods

***

<Note>
  Continue to the next module to master performance tuning and optimization techniques.
</Note>
