Sequence-to-Sequence: Transforming Sequences
The Seq2Seq Paradigm
Seq2Seq models transform one sequence into another:- Machine Translation: “Hello world” → “Bonjour monde”
- Summarization: Long document → Short summary
- Question Answering: Context + Question → Answer
- Code Generation: Description → Code
- Speech Recognition: Audio → Text
Copy
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
from typing import List, Tuple, Optional
torch.manual_seed(42)
Basic Encoder-Decoder Architecture
The Encoder
The encoder reads the input sequence and compresses it into a context vector:Copy
class Encoder(nn.Module):
"""LSTM-based encoder for seq2seq."""
def __init__(
self,
vocab_size: int,
embed_dim: int,
hidden_dim: int,
num_layers: int = 2,
dropout: float = 0.1
):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.dropout = nn.Dropout(dropout)
self.lstm = nn.LSTM(
embed_dim,
hidden_dim,
num_layers=num_layers,
bidirectional=True,
dropout=dropout if num_layers > 1 else 0,
batch_first=True
)
# Project bidirectional hidden state to decoder size
self.fc_hidden = nn.Linear(hidden_dim * 2, hidden_dim)
self.fc_cell = nn.Linear(hidden_dim * 2, hidden_dim)
def forward(self, src: torch.Tensor, src_lengths: torch.Tensor):
"""
Args:
src: [batch_size, src_len] - source token ids
src_lengths: [batch_size] - actual lengths (for packing)
Returns:
outputs: [batch_size, src_len, hidden_dim * 2]
hidden: (h_n, c_n) each [num_layers, batch_size, hidden_dim]
"""
# Embed and apply dropout
embedded = self.dropout(self.embedding(src))
# Pack for efficient computation
packed = nn.utils.rnn.pack_padded_sequence(
embedded, src_lengths.cpu(),
batch_first=True, enforce_sorted=False
)
# Forward through LSTM
outputs, (hidden, cell) = self.lstm(packed)
# Unpack
outputs, _ = nn.utils.rnn.pad_packed_sequence(outputs, batch_first=True)
# Combine bidirectional hidden states
# hidden: [num_layers * 2, batch_size, hidden_dim]
# Reshape: [num_layers, 2, batch_size, hidden_dim]
num_layers = hidden.size(0) // 2
batch_size = hidden.size(1)
hidden_dim = hidden.size(2)
hidden = hidden.view(num_layers, 2, batch_size, hidden_dim)
cell = cell.view(num_layers, 2, batch_size, hidden_dim)
# Concatenate forward and backward
hidden = torch.cat([hidden[:, 0], hidden[:, 1]], dim=2) # [num_layers, batch_size, hidden_dim * 2]
cell = torch.cat([cell[:, 0], cell[:, 1]], dim=2)
# Project to decoder size
hidden = torch.tanh(self.fc_hidden(hidden))
cell = torch.tanh(self.fc_cell(cell))
return outputs, (hidden, cell)
# Test encoder
encoder = Encoder(vocab_size=10000, embed_dim=256, hidden_dim=512, num_layers=2)
src = torch.randint(0, 10000, (32, 20)) # batch_size=32, src_len=20
src_lengths = torch.randint(10, 20, (32,))
outputs, (hidden, cell) = encoder(src, src_lengths)
print(f"Encoder outputs: {outputs.shape}") # [32, 20, 1024]
print(f"Hidden state: {hidden.shape}") # [2, 32, 512]
The Decoder
The decoder generates the output sequence one token at a time:Copy
class Decoder(nn.Module):
"""LSTM decoder with optional attention."""
def __init__(
self,
vocab_size: int,
embed_dim: int,
hidden_dim: int,
num_layers: int = 2,
dropout: float = 0.1
):
super().__init__()
self.vocab_size = vocab_size
self.hidden_dim = hidden_dim
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.dropout = nn.Dropout(dropout)
self.lstm = nn.LSTM(
embed_dim,
hidden_dim,
num_layers=num_layers,
dropout=dropout if num_layers > 1 else 0,
batch_first=True
)
self.fc_out = nn.Linear(hidden_dim, vocab_size)
def forward(
self,
tgt: torch.Tensor,
hidden: Tuple[torch.Tensor, torch.Tensor],
encoder_outputs: Optional[torch.Tensor] = None
):
"""
Args:
tgt: [batch_size, 1] - current target token
hidden: (h, c) from previous step
encoder_outputs: [batch_size, src_len, hidden*2] (optional, for attention)
Returns:
output: [batch_size, vocab_size] - logits
hidden: updated hidden state
"""
# Embed input token
embedded = self.dropout(self.embedding(tgt))
# Forward through LSTM
output, hidden = self.lstm(embedded, hidden)
# Project to vocabulary
prediction = self.fc_out(output.squeeze(1))
return prediction, hidden
# Test decoder
decoder = Decoder(vocab_size=10000, embed_dim=256, hidden_dim=512, num_layers=2)
tgt_token = torch.randint(0, 10000, (32, 1)) # Single token
prediction, new_hidden = decoder(tgt_token, (hidden, cell))
print(f"Prediction: {prediction.shape}") # [32, 10000]
Complete Seq2Seq Model
Copy
class Seq2Seq(nn.Module):
"""Complete sequence-to-sequence model."""
def __init__(
self,
encoder: Encoder,
decoder: Decoder,
device: torch.device
):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
def forward(
self,
src: torch.Tensor,
src_lengths: torch.Tensor,
tgt: torch.Tensor,
teacher_forcing_ratio: float = 0.5
):
"""
Args:
src: [batch_size, src_len]
src_lengths: [batch_size]
tgt: [batch_size, tgt_len]
teacher_forcing_ratio: probability of using ground truth
Returns:
outputs: [batch_size, tgt_len, vocab_size]
"""
batch_size = src.size(0)
tgt_len = tgt.size(1)
vocab_size = self.decoder.vocab_size
# Encode source sequence
encoder_outputs, hidden = self.encoder(src, src_lengths)
# Store decoder outputs
outputs = torch.zeros(batch_size, tgt_len, vocab_size).to(self.device)
# First input is <SOS> token
input_token = tgt[:, 0:1]
for t in range(1, tgt_len):
# Decode one step
output, hidden = self.decoder(input_token, hidden, encoder_outputs)
outputs[:, t, :] = output
# Teacher forcing
use_teacher_forcing = random.random() < teacher_forcing_ratio
if use_teacher_forcing:
input_token = tgt[:, t:t+1]
else:
input_token = output.argmax(dim=1, keepdim=True)
return outputs
def generate(
self,
src: torch.Tensor,
src_lengths: torch.Tensor,
max_len: int = 50,
sos_token: int = 1,
eos_token: int = 2
) -> torch.Tensor:
"""Generate output sequence using greedy decoding."""
batch_size = src.size(0)
# Encode
encoder_outputs, hidden = self.encoder(src, src_lengths)
# Initialize with SOS
input_token = torch.full((batch_size, 1), sos_token, device=self.device)
generated = [input_token]
for _ in range(max_len):
output, hidden = self.decoder(input_token, hidden, encoder_outputs)
input_token = output.argmax(dim=1, keepdim=True)
generated.append(input_token)
# Check if all sequences have generated EOS
if (input_token == eos_token).all():
break
return torch.cat(generated, dim=1)
# Create full model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
encoder = Encoder(10000, 256, 512, 2).to(device)
decoder = Decoder(10000, 256, 512, 2).to(device)
model = Seq2Seq(encoder, decoder, device)
print(f"Total parameters: {sum(p.numel() for p in model.parameters()):,}")
Attention Mechanisms
The Information Bottleneck Problem
The basic encoder-decoder forces ALL information through a fixed-size context vector. Attention solves this by allowing the decoder to “look at” different parts of the input.Bahdanau Attention (Additive)
Copy
class BahdanauAttention(nn.Module):
"""Additive attention mechanism."""
def __init__(self, encoder_dim: int, decoder_dim: int, attention_dim: int):
super().__init__()
# Learnable attention weights
self.W_encoder = nn.Linear(encoder_dim, attention_dim)
self.W_decoder = nn.Linear(decoder_dim, attention_dim)
self.v = nn.Linear(attention_dim, 1, bias=False)
def forward(
self,
encoder_outputs: torch.Tensor,
decoder_hidden: torch.Tensor,
mask: Optional[torch.Tensor] = None
):
"""
Args:
encoder_outputs: [batch_size, src_len, encoder_dim]
decoder_hidden: [batch_size, decoder_dim]
mask: [batch_size, src_len] - True for padding positions
Returns:
context: [batch_size, encoder_dim]
attention_weights: [batch_size, src_len]
"""
src_len = encoder_outputs.size(1)
# Project encoder outputs
encoder_proj = self.W_encoder(encoder_outputs) # [batch, src_len, attention_dim]
# Project decoder hidden (repeat for each source position)
decoder_proj = self.W_decoder(decoder_hidden).unsqueeze(1) # [batch, 1, attention_dim]
decoder_proj = decoder_proj.repeat(1, src_len, 1) # [batch, src_len, attention_dim]
# Compute attention scores
energy = torch.tanh(encoder_proj + decoder_proj) # [batch, src_len, attention_dim]
attention_scores = self.v(energy).squeeze(-1) # [batch, src_len]
# Apply mask (set padding positions to -inf)
if mask is not None:
attention_scores = attention_scores.masked_fill(mask, float('-inf'))
# Normalize with softmax
attention_weights = F.softmax(attention_scores, dim=1)
# Weighted sum of encoder outputs
context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs)
context = context.squeeze(1) # [batch, encoder_dim]
return context, attention_weights
# Test
attention = BahdanauAttention(1024, 512, 256)
encoder_outputs = torch.randn(32, 20, 1024)
decoder_hidden = torch.randn(32, 512)
context, weights = attention(encoder_outputs, decoder_hidden)
print(f"Context: {context.shape}") # [32, 1024]
print(f"Attention weights: {weights.shape}") # [32, 20]
print(f"Weights sum: {weights.sum(dim=1)}") # Should be ~1.0
Luong Attention (Multiplicative)
Copy
class LuongAttention(nn.Module):
"""Multiplicative attention (faster than Bahdanau)."""
def __init__(self, encoder_dim: int, decoder_dim: int, method: str = 'general'):
super().__init__()
self.method = method
if method == 'general':
self.W = nn.Linear(encoder_dim, decoder_dim, bias=False)
elif method == 'concat':
self.W = nn.Linear(encoder_dim + decoder_dim, decoder_dim)
self.v = nn.Linear(decoder_dim, 1, bias=False)
# 'dot' method needs no parameters
def forward(
self,
encoder_outputs: torch.Tensor,
decoder_hidden: torch.Tensor,
mask: Optional[torch.Tensor] = None
):
"""
Args:
encoder_outputs: [batch_size, src_len, encoder_dim]
decoder_hidden: [batch_size, decoder_dim]
"""
if self.method == 'dot':
# Requires encoder_dim == decoder_dim
attention_scores = torch.bmm(
encoder_outputs,
decoder_hidden.unsqueeze(2)
).squeeze(2)
elif self.method == 'general':
# Project encoder outputs to decoder dimension
energy = self.W(encoder_outputs) # [batch, src_len, decoder_dim]
attention_scores = torch.bmm(
energy,
decoder_hidden.unsqueeze(2)
).squeeze(2)
elif self.method == 'concat':
src_len = encoder_outputs.size(1)
decoder_expanded = decoder_hidden.unsqueeze(1).repeat(1, src_len, 1)
concat = torch.cat([encoder_outputs, decoder_expanded], dim=2)
energy = torch.tanh(self.W(concat))
attention_scores = self.v(energy).squeeze(2)
# Apply mask and normalize
if mask is not None:
attention_scores = attention_scores.masked_fill(mask, float('-inf'))
attention_weights = F.softmax(attention_scores, dim=1)
context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs).squeeze(1)
return context, attention_weights
# Compare attention methods
print("Attention Method Comparison")
print("="*50)
for method in ['dot', 'general', 'concat']:
if method == 'dot':
attn = LuongAttention(512, 512, method) # Same dimensions for dot
enc_out = torch.randn(32, 20, 512)
dec_hidden = torch.randn(32, 512)
else:
attn = LuongAttention(1024, 512, method)
enc_out = torch.randn(32, 20, 1024)
dec_hidden = torch.randn(32, 512)
context, weights = attn(enc_out, dec_hidden)
params = sum(p.numel() for p in attn.parameters())
print(f"{method}: {params:,} parameters")
Attention Decoder
Copy
class AttentionDecoder(nn.Module):
"""Decoder with attention mechanism."""
def __init__(
self,
vocab_size: int,
embed_dim: int,
encoder_dim: int,
hidden_dim: int,
attention: nn.Module,
num_layers: int = 2,
dropout: float = 0.1
):
super().__init__()
self.vocab_size = vocab_size
self.attention = attention
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.dropout = nn.Dropout(dropout)
# LSTM input: embedding + context
self.lstm = nn.LSTM(
embed_dim + encoder_dim,
hidden_dim,
num_layers=num_layers,
dropout=dropout if num_layers > 1 else 0,
batch_first=True
)
# Output: hidden + context
self.fc_out = nn.Linear(hidden_dim + encoder_dim, vocab_size)
def forward(
self,
tgt: torch.Tensor,
hidden: Tuple[torch.Tensor, torch.Tensor],
encoder_outputs: torch.Tensor,
mask: Optional[torch.Tensor] = None
):
"""
Args:
tgt: [batch_size, 1]
hidden: (h, c)
encoder_outputs: [batch_size, src_len, encoder_dim]
mask: [batch_size, src_len]
"""
# Embed input
embedded = self.dropout(self.embedding(tgt)) # [batch, 1, embed_dim]
# Compute attention (use last layer hidden state)
h_t = hidden[0][-1] # [batch, hidden_dim]
context, attention_weights = self.attention(encoder_outputs, h_t, mask)
# Concatenate embedding with context
lstm_input = torch.cat([embedded, context.unsqueeze(1)], dim=2)
# Forward through LSTM
output, hidden = self.lstm(lstm_input, hidden)
output = output.squeeze(1) # [batch, hidden_dim]
# Concatenate LSTM output with context for prediction
combined = torch.cat([output, context], dim=1)
prediction = self.fc_out(combined)
return prediction, hidden, attention_weights
Beam Search Decoding
Greedy decoding can miss better sequences. Beam search explores multiple hypotheses:Copy
class BeamSearchDecoder:
"""Beam search for sequence generation."""
def __init__(
self,
model: nn.Module,
beam_size: int = 5,
max_len: int = 50,
length_penalty: float = 0.6,
sos_token: int = 1,
eos_token: int = 2
):
self.model = model
self.beam_size = beam_size
self.max_len = max_len
self.length_penalty = length_penalty
self.sos_token = sos_token
self.eos_token = eos_token
def decode(
self,
src: torch.Tensor,
src_lengths: torch.Tensor
) -> List[Tuple[torch.Tensor, float]]:
"""
Perform beam search decoding.
Returns:
List of (sequence, score) tuples for each input
"""
device = src.device
batch_size = src.size(0)
# Encode source
encoder_outputs, (hidden, cell) = self.model.encoder(src, src_lengths)
# Process each example in batch separately
results = []
for b in range(batch_size):
# Get encoder outputs for this example
enc_out_b = encoder_outputs[b:b+1] # [1, src_len, hidden*2]
h_b = hidden[:, b:b+1, :].contiguous() # [layers, 1, hidden]
c_b = cell[:, b:b+1, :].contiguous()
# Initialize beams
beams = [(
[self.sos_token], # tokens
(h_b, c_b), # hidden state
0.0 # log probability
)]
completed = []
for step in range(self.max_len):
all_candidates = []
for tokens, (h, c), score in beams:
if tokens[-1] == self.eos_token:
completed.append((tokens, score))
continue
# Get last token
last_token = torch.tensor([[tokens[-1]]], device=device)
# Decode one step
output, (new_h, new_c) = self.model.decoder(
last_token, (h, c), enc_out_b
)
# Get top-k next tokens
log_probs = F.log_softmax(output, dim=1)
top_probs, top_indices = log_probs.topk(self.beam_size)
for i in range(self.beam_size):
new_tokens = tokens + [top_indices[0, i].item()]
new_score = score + top_probs[0, i].item()
all_candidates.append((
new_tokens,
(new_h, new_c),
new_score
))
if not all_candidates:
break
# Select top-k beams with length normalization
all_candidates.sort(
key=lambda x: x[2] / (len(x[0]) ** self.length_penalty),
reverse=True
)
beams = all_candidates[:self.beam_size]
# Add remaining beams to completed
completed.extend([(tokens, score) for tokens, _, score in beams])
# Sort by normalized score
completed.sort(
key=lambda x: x[1] / (len(x[0]) ** self.length_penalty),
reverse=True
)
# Return best sequence
if completed:
best_tokens, best_score = completed[0]
results.append((torch.tensor(best_tokens), best_score))
else:
results.append((torch.tensor([self.sos_token]), 0.0))
return results
# Example usage
beam_decoder = BeamSearchDecoder(
model=model,
beam_size=5,
max_len=50,
length_penalty=0.6
)
Diverse Beam Search
Copy
class DiverseBeamSearch(BeamSearchDecoder):
"""Beam search with diversity penalty."""
def __init__(
self,
model: nn.Module,
beam_size: int = 5,
num_groups: int = 5,
diversity_penalty: float = 0.5,
**kwargs
):
super().__init__(model, beam_size, **kwargs)
self.num_groups = num_groups
self.diversity_penalty = diversity_penalty
def decode(self, src, src_lengths):
"""
Diverse beam search produces multiple diverse outputs.
"""
# Split beams into groups
beams_per_group = self.beam_size // self.num_groups
all_results = []
previous_tokens = set()
for group in range(self.num_groups):
# Standard beam search with diversity penalty
results = self._beam_search_with_penalty(
src, src_lengths, previous_tokens
)
# Add to previous tokens for diversity
for tokens, _ in results:
previous_tokens.update(set(tokens.tolist()))
all_results.extend(results)
return all_results
def _beam_search_with_penalty(self, src, src_lengths, previous_tokens):
# Implementation similar to base beam search
# but with penalty for tokens in previous_tokens
pass
Training Techniques
Label Smoothing
Copy
class LabelSmoothingLoss(nn.Module):
"""Cross-entropy with label smoothing."""
def __init__(self, vocab_size: int, smoothing: float = 0.1, padding_idx: int = 0):
super().__init__()
self.vocab_size = vocab_size
self.smoothing = smoothing
self.padding_idx = padding_idx
self.confidence = 1.0 - smoothing
def forward(self, predictions: torch.Tensor, targets: torch.Tensor):
"""
Args:
predictions: [batch*seq_len, vocab_size]
targets: [batch*seq_len]
"""
# Create smoothed target distribution
with torch.no_grad():
true_dist = torch.zeros_like(predictions)
true_dist.fill_(self.smoothing / (self.vocab_size - 2)) # -2 for padding and target
true_dist.scatter_(1, targets.unsqueeze(1), self.confidence)
true_dist[:, self.padding_idx] = 0
# Zero out padding positions
mask = (targets == self.padding_idx)
true_dist[mask] = 0
# KL divergence
log_probs = F.log_softmax(predictions, dim=-1)
loss = -(true_dist * log_probs).sum(dim=-1)
# Average over non-padding positions
non_pad = (~mask).float()
loss = (loss * non_pad).sum() / non_pad.sum()
return loss
# Test
criterion = LabelSmoothingLoss(vocab_size=10000, smoothing=0.1)
pred = torch.randn(32 * 20, 10000)
target = torch.randint(0, 10000, (32 * 20,))
loss = criterion(pred, target)
print(f"Label smoothing loss: {loss.item():.4f}")
Scheduled Sampling
Gradually reduce teacher forcing during training:Copy
class ScheduledSamplingTrainer:
"""Trainer with scheduled sampling."""
def __init__(
self,
model: nn.Module,
optimizer: torch.optim.Optimizer,
criterion: nn.Module,
initial_tf_ratio: float = 1.0,
decay_rate: float = 0.99,
min_tf_ratio: float = 0.1
):
self.model = model
self.optimizer = optimizer
self.criterion = criterion
self.tf_ratio = initial_tf_ratio
self.decay_rate = decay_rate
self.min_tf_ratio = min_tf_ratio
def train_step(self, src, src_lengths, tgt):
self.model.train()
self.optimizer.zero_grad()
# Forward with current teacher forcing ratio
outputs = self.model(src, src_lengths, tgt, self.tf_ratio)
# Compute loss (ignore first token which is SOS)
outputs = outputs[:, 1:].reshape(-1, outputs.size(-1))
targets = tgt[:, 1:].reshape(-1)
loss = self.criterion(outputs, targets)
loss.backward()
# Gradient clipping
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
return loss.item()
def update_tf_ratio(self):
"""Decay teacher forcing ratio after each epoch."""
self.tf_ratio = max(
self.min_tf_ratio,
self.tf_ratio * self.decay_rate
)
return self.tf_ratio
Practical Applications
Machine Translation
Copy
class TranslationModel:
"""Complete translation pipeline."""
def __init__(
self,
model: Seq2Seq,
src_tokenizer,
tgt_tokenizer,
beam_size: int = 5
):
self.model = model
self.src_tokenizer = src_tokenizer
self.tgt_tokenizer = tgt_tokenizer
self.beam_decoder = BeamSearchDecoder(model, beam_size=beam_size)
def translate(self, text: str) -> str:
"""Translate a single sentence."""
self.model.eval()
# Tokenize source
src_tokens = self.src_tokenizer.encode(text)
src = torch.tensor([src_tokens]).to(self.model.device)
src_length = torch.tensor([len(src_tokens)])
with torch.no_grad():
results = self.beam_decoder.decode(src, src_length)
# Decode best result
best_tokens, score = results[0]
translation = self.tgt_tokenizer.decode(best_tokens.tolist())
return translation
def translate_batch(self, texts: List[str]) -> List[str]:
"""Translate multiple sentences."""
self.model.eval()
# Tokenize and pad
src_tokens = [self.src_tokenizer.encode(t) for t in texts]
max_len = max(len(t) for t in src_tokens)
src = torch.zeros(len(texts), max_len, dtype=torch.long)
src_lengths = torch.zeros(len(texts), dtype=torch.long)
for i, tokens in enumerate(src_tokens):
src[i, :len(tokens)] = torch.tensor(tokens)
src_lengths[i] = len(tokens)
src = src.to(self.model.device)
with torch.no_grad():
results = self.beam_decoder.decode(src, src_lengths)
translations = []
for tokens, score in results:
translation = self.tgt_tokenizer.decode(tokens.tolist())
translations.append(translation)
return translations
Text Summarization
Copy
class SummarizationModel:
"""Abstractive summarization with copy mechanism."""
def __init__(self, model, tokenizer, max_summary_len=100):
self.model = model
self.tokenizer = tokenizer
self.max_summary_len = max_summary_len
def summarize(
self,
text: str,
min_length: int = 30,
max_length: int = 100,
num_beams: int = 4,
no_repeat_ngram_size: int = 3
) -> str:
"""Generate summary with constraints."""
# Tokenize
src_tokens = self.tokenizer.encode(text)
src = torch.tensor([src_tokens]).to(self.model.device)
src_length = torch.tensor([len(src_tokens)])
# Beam search with constraints
beam_decoder = BeamSearchDecoder(
self.model,
beam_size=num_beams,
max_len=max_length
)
with torch.no_grad():
results = beam_decoder.decode(src, src_length)
# Filter by minimum length
valid_results = [
(tokens, score) for tokens, score in results
if len(tokens) >= min_length
]
if not valid_results:
valid_results = results
best_tokens, _ = valid_results[0]
summary = self.tokenizer.decode(best_tokens.tolist())
return summary
Evaluation Metrics
Copy
class Seq2SeqMetrics:
"""Evaluation metrics for seq2seq models."""
@staticmethod
def bleu_score(predictions: List[str], references: List[List[str]]) -> dict:
"""
Compute BLEU score.
Args:
predictions: List of predicted sentences
references: List of reference lists (multiple refs per prediction)
"""
from collections import Counter
def ngram_counts(tokens, n):
return Counter(tuple(tokens[i:i+n]) for i in range(len(tokens) - n + 1))
def modified_precision(pred, refs, n):
pred_counts = ngram_counts(pred.split(), n)
max_ref_counts = Counter()
for ref in refs:
ref_counts = ngram_counts(ref.split(), n)
for ngram, count in ref_counts.items():
max_ref_counts[ngram] = max(max_ref_counts[ngram], count)
clipped = sum(min(count, max_ref_counts[ngram])
for ngram, count in pred_counts.items())
total = sum(pred_counts.values())
return clipped / total if total > 0 else 0
# Compute precision for n=1,2,3,4
precisions = []
for n in range(1, 5):
p = np.mean([modified_precision(pred, refs, n)
for pred, refs in zip(predictions, references)])
precisions.append(p)
# Brevity penalty
pred_len = sum(len(p.split()) for p in predictions)
ref_len = sum(min(len(r.split()) for r in refs) for refs in references)
bp = np.exp(1 - ref_len / pred_len) if pred_len < ref_len else 1.0
# Geometric mean
log_precisions = [np.log(p + 1e-10) for p in precisions]
bleu = bp * np.exp(sum(log_precisions) / 4)
return {
'bleu': bleu * 100,
'bleu-1': precisions[0] * 100,
'bleu-2': precisions[1] * 100,
'bleu-3': precisions[2] * 100,
'bleu-4': precisions[3] * 100,
'brevity_penalty': bp
}
@staticmethod
def rouge_score(predictions: List[str], references: List[str]) -> dict:
"""Compute ROUGE-L score."""
def lcs(x, y):
"""Longest common subsequence."""
m, n = len(x), len(y)
dp = [[0] * (n + 1) for _ in range(m + 1)]
for i in range(1, m + 1):
for j in range(1, n + 1):
if x[i-1] == y[j-1]:
dp[i][j] = dp[i-1][j-1] + 1
else:
dp[i][j] = max(dp[i-1][j], dp[i][j-1])
return dp[m][n]
rouge_scores = []
for pred, ref in zip(predictions, references):
pred_tokens = pred.split()
ref_tokens = ref.split()
lcs_len = lcs(pred_tokens, ref_tokens)
precision = lcs_len / len(pred_tokens) if pred_tokens else 0
recall = lcs_len / len(ref_tokens) if ref_tokens else 0
f1 = 2 * precision * recall / (precision + recall + 1e-10)
rouge_scores.append(f1)
return {'rouge-l': np.mean(rouge_scores) * 100}
# Example
metrics = Seq2SeqMetrics()
predictions = ["the cat sat on the mat", "hello world"]
references = [["the cat is on the mat", "a cat sat on a mat"], ["hello world", "hi world"]]
bleu = metrics.bleu_score(predictions, references)
print(f"BLEU Score: {bleu}")
rouge = metrics.rouge_score(predictions, [refs[0] for refs in references])
print(f"ROUGE-L: {rouge}")
Exercises
Exercise 1: Implement Copy Mechanism
Exercise 1: Implement Copy Mechanism
Add pointer-generator network to copy words from source:
Copy
class CopyMechanism(nn.Module):
def forward(self, context, decoder_state, source):
# Compute copy probability
p_copy = self.sigmoid(self.W_copy([context, decoder_state]))
# Blend vocabulary and copy distributions
p_final = (1 - p_copy) * p_vocab + p_copy * p_copy_dist
Exercise 2: Add Coverage Mechanism
Exercise 2: Add Coverage Mechanism
Prevent repetition by tracking attention history:
Copy
class CoverageAttention(nn.Module):
def forward(self, encoder_outputs, decoder_hidden, coverage):
# coverage: sum of attention weights from previous steps
coverage_penalty = self.W_coverage(coverage)
attention = standard_attention + coverage_penalty
Exercise 3: Implement Nucleus Sampling
Exercise 3: Implement Nucleus Sampling
Top-p sampling for more diverse generation:
Copy
def nucleus_sampling(logits, p=0.9, temperature=1.0):
probs = F.softmax(logits / temperature, dim=-1)
sorted_probs, sorted_indices = torch.sort(probs, descending=True)
cumsum = torch.cumsum(sorted_probs, dim=-1)
# Find nucleus
nucleus = cumsum < p
# Sample from nucleus