The Problem: We have a sparse matrix with mostly missing values. How do we predict the ? values?The Solution: Find hidden “taste factors” that explain the patterns!
import numpy as npimport pandas as pdfrom scipy.sparse.linalg import svdsimport matplotlib.pyplot as pltfrom sklearn.metrics import mean_squared_errornp.random.seed(42)# Simulate a music streaming datasetn_users = 500n_songs = 200# Create user and song namesusers = [f"user_{i}" for i in range(n_users)]songs = [f"song_{i}" for i in range(n_songs)]# Create some "ground truth" hidden factors# These are the TRUE taste dimensions (we'll try to recover them)n_factors = 5# User taste profiles (how much each user likes each hidden factor)user_factors = np.random.randn(n_users, n_factors) * 0.5# Song characteristics (how much each song has each hidden factor)song_factors = np.random.randn(n_factors, n_songs) * 0.5# True ratings = user_factors @ song_factors + noisetrue_ratings = user_factors @ song_factors# Convert to 1-5 scaletrue_ratings = (true_ratings - true_ratings.min()) / (true_ratings.max() - true_ratings.min()) * 4 + 1# Create sparse observations (users only rate some songs)# Each user rates about 5-15% of songsobserved_mask = np.random.random((n_users, n_songs)) < 0.1# Add some noise to observed ratingsnoise = np.random.randn(n_users, n_songs) * 0.3observed_ratings = true_ratings + noiseobserved_ratings = np.clip(observed_ratings, 1, 5)# Replace unobserved with NaNratings_matrix = np.where(observed_mask, observed_ratings, np.nan)print(f"Dataset size: {n_users} users × {n_songs} songs")print(f"Observed ratings: {observed_mask.sum():,} ({observed_mask.mean()*100:.1f}%)")print(f"Missing ratings: {(~observed_mask).sum():,} ({(~observed_mask).mean()*100:.1f}%)")# Create a DataFrame for easier manipulationratings_df = pd.DataFrame(ratings_matrix, index=users, columns=songs)print("\nSample of ratings matrix:")print(ratings_df.iloc[:5, :5].round(1))
# Analyze the ratings distributionobserved_values = ratings_matrix[~np.isnan(ratings_matrix)]fig, axes = plt.subplots(1, 3, figsize=(15, 4))# Distribution of ratingsaxes[0].hist(observed_values, bins=20, edgecolor='black', alpha=0.7)axes[0].set_xlabel('Rating')axes[0].set_ylabel('Count')axes[0].set_title('Distribution of Ratings')axes[0].axvline(observed_values.mean(), color='red', linestyle='--', label=f'Mean: {observed_values.mean():.2f}')axes[0].legend()# Ratings per userratings_per_user = (~np.isnan(ratings_matrix)).sum(axis=1)axes[1].hist(ratings_per_user, bins=30, edgecolor='black', alpha=0.7)axes[1].set_xlabel('Number of Rated Songs')axes[1].set_ylabel('Number of Users')axes[1].set_title('Ratings per User')# Ratings per songratings_per_song = (~np.isnan(ratings_matrix)).sum(axis=0)axes[2].hist(ratings_per_song, bins=30, edgecolor='black', alpha=0.7)axes[2].set_xlabel('Number of Ratings')axes[2].set_ylabel('Number of Songs')axes[2].set_title('Ratings per Song')plt.tight_layout()plt.show()print(f"\nAverage rating: {observed_values.mean():.2f}")print(f"Std of ratings: {observed_values.std():.2f}")print(f"Avg ratings per user: {ratings_per_user.mean():.1f}")print(f"Avg ratings per song: {ratings_per_song.mean():.1f}")
For SVD, we need a complete matrix. We’ll fill missing values with the user’s average rating.
Copy
def fill_missing_values(ratings_matrix): """ Fill missing ratings with user averages. This is a simple baseline - more sophisticated methods exist! """ filled_matrix = ratings_matrix.copy() for i in range(ratings_matrix.shape[0]): user_ratings = ratings_matrix[i, :] user_mean = np.nanmean(user_ratings) # If user has no ratings, use global mean if np.isnan(user_mean): user_mean = np.nanmean(ratings_matrix) # Fill missing values with user mean filled_matrix[i, np.isnan(user_ratings)] = user_mean return filled_matrix# Fill missing valuesfilled_ratings = fill_missing_values(ratings_matrix)print("Before filling:")print(ratings_matrix[:3, :5].round(1))print("\nAfter filling:")print(filled_ratings[:3, :5].round(1))
def predict_rating(user_idx, song_idx, U, sigma, Vt, user_means): """Predict a single user-song rating.""" # Get user and song latent factors user_factor = U[user_idx, :] @ np.diag(sigma) song_factor = Vt[:, song_idx] # Predicted rating = dot product + user mean pred = np.dot(user_factor, song_factor) + user_means[user_idx] # Clip to valid range return np.clip(pred, 1, 5)def get_recommendations(user_idx, ratings_matrix, U, sigma, Vt, user_means, n_recs=10): """ Get top N song recommendations for a user. Only recommends songs the user hasn't rated. """ # Get all unrated songs for this user rated_songs = ~np.isnan(ratings_matrix[user_idx, :]) # Predict ratings for all unrated songs predictions = [] for song_idx in range(ratings_matrix.shape[1]): if not rated_songs[song_idx]: # Only unrated songs pred = predict_rating(user_idx, song_idx, U, sigma, Vt, user_means) predictions.append((song_idx, pred)) # Sort by predicted rating (descending) and return top N predictions.sort(key=lambda x: x[1], reverse=True) return predictions[:n_recs]# Get recommendations for a sample usersample_user = 42print(f"Getting recommendations for User {sample_user}...")print(f"User has rated {(~np.isnan(ratings_matrix[sample_user, :])).sum()} songs\n")# Show their existing ratingsexisting_ratings = []for song_idx in range(n_songs): if not np.isnan(ratings_matrix[sample_user, song_idx]): existing_ratings.append((song_idx, ratings_matrix[sample_user, song_idx]))existing_ratings.sort(key=lambda x: x[1], reverse=True)print("User's top-rated songs:")for song_idx, rating in existing_ratings[:5]: print(f" {songs[song_idx]}: {rating:.1f}/5.0")# Get recommendationsrecommendations = get_recommendations(sample_user, ratings_matrix, U, sigma, Vt, user_means, n_recs=10)print(f"\nTop 10 Recommended Songs:")for rank, (song_idx, pred_rating) in enumerate(recommendations, 1): true_rating = true_ratings[sample_user, song_idx] # Our ground truth print(f" {rank}. {songs[song_idx]}: Predicted {pred_rating:.2f}/5.0 (True: {true_rating:.2f})")
from sklearn.metrics.pairwise import cosine_similaritydef find_similar_users(user_idx, U, n_similar=5): """Find users with similar taste profiles.""" # User factors encode taste preferences user_vector = U[user_idx, :].reshape(1, -1) # Calculate similarity with all other users similarities = cosine_similarity(user_vector, U)[0] # Get top similar users (excluding self) similar_indices = np.argsort(similarities)[::-1][1:n_similar+1] return [(idx, similarities[idx]) for idx in similar_indices]def find_similar_songs(song_idx, Vt, n_similar=5): """Find songs with similar characteristics.""" # Song factors encode song characteristics song_vector = Vt[:, song_idx].reshape(1, -1) # Calculate similarity with all other songs similarities = cosine_similarity(song_vector, Vt.T)[0] # Get top similar songs (excluding self) similar_indices = np.argsort(similarities)[::-1][1:n_similar+1] return [(idx, similarities[idx]) for idx in similar_indices]# Find similar userssample_user = 42similar_users = find_similar_users(sample_user, U, n_similar=5)print(f"Users similar to User {sample_user}:")for user_idx, similarity in similar_users: print(f" User {user_idx}: {similarity:.3f} similarity")# Find similar songssample_song = 15similar_songs = find_similar_songs(sample_song, Vt, n_similar=5)print(f"\nSongs similar to Song {sample_song}:")for song_idx, similarity in similar_songs: print(f" Song {song_idx}: {similarity:.3f} similarity")
Congratulations! You’ve built a working recommendation engine using the same fundamental math that powers Netflix, Spotify, and Amazon. The concepts you’ve learned - matrix factorization, similarity measures, and latent factors - are the foundation of modern personalization systems.