Skip to main content
Sequence-to-Sequence

Sequence-to-Sequence: Transforming Sequences

The Seq2Seq Paradigm

Seq2Seq models transform one sequence into another:
  • Machine Translation: “Hello world” → “Bonjour monde”
  • Summarization: Long document → Short summary
  • Question Answering: Context + Question → Answer
  • Code Generation: Description → Code
  • Speech Recognition: Audio → Text
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
from typing import List, Tuple, Optional

torch.manual_seed(42)

Basic Encoder-Decoder Architecture

The Encoder

The encoder reads the input sequence and compresses it into a context vector:
class Encoder(nn.Module):
    """LSTM-based encoder for seq2seq."""
    
    def __init__(
        self,
        vocab_size: int,
        embed_dim: int,
        hidden_dim: int,
        num_layers: int = 2,
        dropout: float = 0.1
    ):
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.dropout = nn.Dropout(dropout)
        
        self.lstm = nn.LSTM(
            embed_dim,
            hidden_dim,
            num_layers=num_layers,
            bidirectional=True,
            dropout=dropout if num_layers > 1 else 0,
            batch_first=True
        )
        
        # Project bidirectional hidden state to decoder size
        self.fc_hidden = nn.Linear(hidden_dim * 2, hidden_dim)
        self.fc_cell = nn.Linear(hidden_dim * 2, hidden_dim)
    
    def forward(self, src: torch.Tensor, src_lengths: torch.Tensor):
        """
        Args:
            src: [batch_size, src_len] - source token ids
            src_lengths: [batch_size] - actual lengths (for packing)
        
        Returns:
            outputs: [batch_size, src_len, hidden_dim * 2]
            hidden: (h_n, c_n) each [num_layers, batch_size, hidden_dim]
        """
        # Embed and apply dropout
        embedded = self.dropout(self.embedding(src))
        
        # Pack for efficient computation
        packed = nn.utils.rnn.pack_padded_sequence(
            embedded, src_lengths.cpu(), 
            batch_first=True, enforce_sorted=False
        )
        
        # Forward through LSTM
        outputs, (hidden, cell) = self.lstm(packed)
        
        # Unpack
        outputs, _ = nn.utils.rnn.pad_packed_sequence(outputs, batch_first=True)
        
        # Combine bidirectional hidden states
        # hidden: [num_layers * 2, batch_size, hidden_dim]
        # Reshape: [num_layers, 2, batch_size, hidden_dim]
        num_layers = hidden.size(0) // 2
        batch_size = hidden.size(1)
        hidden_dim = hidden.size(2)
        
        hidden = hidden.view(num_layers, 2, batch_size, hidden_dim)
        cell = cell.view(num_layers, 2, batch_size, hidden_dim)
        
        # Concatenate forward and backward
        hidden = torch.cat([hidden[:, 0], hidden[:, 1]], dim=2)  # [num_layers, batch_size, hidden_dim * 2]
        cell = torch.cat([cell[:, 0], cell[:, 1]], dim=2)
        
        # Project to decoder size
        hidden = torch.tanh(self.fc_hidden(hidden))
        cell = torch.tanh(self.fc_cell(cell))
        
        return outputs, (hidden, cell)


# Test encoder
encoder = Encoder(vocab_size=10000, embed_dim=256, hidden_dim=512, num_layers=2)
src = torch.randint(0, 10000, (32, 20))  # batch_size=32, src_len=20
src_lengths = torch.randint(10, 20, (32,))

outputs, (hidden, cell) = encoder(src, src_lengths)
print(f"Encoder outputs: {outputs.shape}")  # [32, 20, 1024]
print(f"Hidden state: {hidden.shape}")  # [2, 32, 512]

The Decoder

The decoder generates the output sequence one token at a time:
class Decoder(nn.Module):
    """LSTM decoder with optional attention."""
    
    def __init__(
        self,
        vocab_size: int,
        embed_dim: int,
        hidden_dim: int,
        num_layers: int = 2,
        dropout: float = 0.1
    ):
        super().__init__()
        
        self.vocab_size = vocab_size
        self.hidden_dim = hidden_dim
        
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.dropout = nn.Dropout(dropout)
        
        self.lstm = nn.LSTM(
            embed_dim,
            hidden_dim,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0,
            batch_first=True
        )
        
        self.fc_out = nn.Linear(hidden_dim, vocab_size)
    
    def forward(
        self,
        tgt: torch.Tensor,
        hidden: Tuple[torch.Tensor, torch.Tensor],
        encoder_outputs: Optional[torch.Tensor] = None
    ):
        """
        Args:
            tgt: [batch_size, 1] - current target token
            hidden: (h, c) from previous step
            encoder_outputs: [batch_size, src_len, hidden*2] (optional, for attention)
        
        Returns:
            output: [batch_size, vocab_size] - logits
            hidden: updated hidden state
        """
        # Embed input token
        embedded = self.dropout(self.embedding(tgt))
        
        # Forward through LSTM
        output, hidden = self.lstm(embedded, hidden)
        
        # Project to vocabulary
        prediction = self.fc_out(output.squeeze(1))
        
        return prediction, hidden


# Test decoder
decoder = Decoder(vocab_size=10000, embed_dim=256, hidden_dim=512, num_layers=2)
tgt_token = torch.randint(0, 10000, (32, 1))  # Single token

prediction, new_hidden = decoder(tgt_token, (hidden, cell))
print(f"Prediction: {prediction.shape}")  # [32, 10000]

Complete Seq2Seq Model

class Seq2Seq(nn.Module):
    """Complete sequence-to-sequence model."""
    
    def __init__(
        self,
        encoder: Encoder,
        decoder: Decoder,
        device: torch.device
    ):
        super().__init__()
        
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
    
    def forward(
        self,
        src: torch.Tensor,
        src_lengths: torch.Tensor,
        tgt: torch.Tensor,
        teacher_forcing_ratio: float = 0.5
    ):
        """
        Args:
            src: [batch_size, src_len]
            src_lengths: [batch_size]
            tgt: [batch_size, tgt_len]
            teacher_forcing_ratio: probability of using ground truth
        
        Returns:
            outputs: [batch_size, tgt_len, vocab_size]
        """
        batch_size = src.size(0)
        tgt_len = tgt.size(1)
        vocab_size = self.decoder.vocab_size
        
        # Encode source sequence
        encoder_outputs, hidden = self.encoder(src, src_lengths)
        
        # Store decoder outputs
        outputs = torch.zeros(batch_size, tgt_len, vocab_size).to(self.device)
        
        # First input is <SOS> token
        input_token = tgt[:, 0:1]
        
        for t in range(1, tgt_len):
            # Decode one step
            output, hidden = self.decoder(input_token, hidden, encoder_outputs)
            outputs[:, t, :] = output
            
            # Teacher forcing
            use_teacher_forcing = random.random() < teacher_forcing_ratio
            
            if use_teacher_forcing:
                input_token = tgt[:, t:t+1]
            else:
                input_token = output.argmax(dim=1, keepdim=True)
        
        return outputs
    
    def generate(
        self,
        src: torch.Tensor,
        src_lengths: torch.Tensor,
        max_len: int = 50,
        sos_token: int = 1,
        eos_token: int = 2
    ) -> torch.Tensor:
        """Generate output sequence using greedy decoding."""
        
        batch_size = src.size(0)
        
        # Encode
        encoder_outputs, hidden = self.encoder(src, src_lengths)
        
        # Initialize with SOS
        input_token = torch.full((batch_size, 1), sos_token, device=self.device)
        
        generated = [input_token]
        
        for _ in range(max_len):
            output, hidden = self.decoder(input_token, hidden, encoder_outputs)
            input_token = output.argmax(dim=1, keepdim=True)
            generated.append(input_token)
            
            # Check if all sequences have generated EOS
            if (input_token == eos_token).all():
                break
        
        return torch.cat(generated, dim=1)


# Create full model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
encoder = Encoder(10000, 256, 512, 2).to(device)
decoder = Decoder(10000, 256, 512, 2).to(device)
model = Seq2Seq(encoder, decoder, device)

print(f"Total parameters: {sum(p.numel() for p in model.parameters()):,}")

Attention Mechanisms

The Information Bottleneck Problem

The basic encoder-decoder forces ALL information through a fixed-size context vector. Attention solves this by allowing the decoder to “look at” different parts of the input.

Bahdanau Attention (Additive)

class BahdanauAttention(nn.Module):
    """Additive attention mechanism."""
    
    def __init__(self, encoder_dim: int, decoder_dim: int, attention_dim: int):
        super().__init__()
        
        # Learnable attention weights
        self.W_encoder = nn.Linear(encoder_dim, attention_dim)
        self.W_decoder = nn.Linear(decoder_dim, attention_dim)
        self.v = nn.Linear(attention_dim, 1, bias=False)
    
    def forward(
        self,
        encoder_outputs: torch.Tensor,
        decoder_hidden: torch.Tensor,
        mask: Optional[torch.Tensor] = None
    ):
        """
        Args:
            encoder_outputs: [batch_size, src_len, encoder_dim]
            decoder_hidden: [batch_size, decoder_dim]
            mask: [batch_size, src_len] - True for padding positions
        
        Returns:
            context: [batch_size, encoder_dim]
            attention_weights: [batch_size, src_len]
        """
        src_len = encoder_outputs.size(1)
        
        # Project encoder outputs
        encoder_proj = self.W_encoder(encoder_outputs)  # [batch, src_len, attention_dim]
        
        # Project decoder hidden (repeat for each source position)
        decoder_proj = self.W_decoder(decoder_hidden).unsqueeze(1)  # [batch, 1, attention_dim]
        decoder_proj = decoder_proj.repeat(1, src_len, 1)  # [batch, src_len, attention_dim]
        
        # Compute attention scores
        energy = torch.tanh(encoder_proj + decoder_proj)  # [batch, src_len, attention_dim]
        attention_scores = self.v(energy).squeeze(-1)  # [batch, src_len]
        
        # Apply mask (set padding positions to -inf)
        if mask is not None:
            attention_scores = attention_scores.masked_fill(mask, float('-inf'))
        
        # Normalize with softmax
        attention_weights = F.softmax(attention_scores, dim=1)
        
        # Weighted sum of encoder outputs
        context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs)
        context = context.squeeze(1)  # [batch, encoder_dim]
        
        return context, attention_weights


# Test
attention = BahdanauAttention(1024, 512, 256)
encoder_outputs = torch.randn(32, 20, 1024)
decoder_hidden = torch.randn(32, 512)

context, weights = attention(encoder_outputs, decoder_hidden)
print(f"Context: {context.shape}")  # [32, 1024]
print(f"Attention weights: {weights.shape}")  # [32, 20]
print(f"Weights sum: {weights.sum(dim=1)}")  # Should be ~1.0

Luong Attention (Multiplicative)

class LuongAttention(nn.Module):
    """Multiplicative attention (faster than Bahdanau)."""
    
    def __init__(self, encoder_dim: int, decoder_dim: int, method: str = 'general'):
        super().__init__()
        
        self.method = method
        
        if method == 'general':
            self.W = nn.Linear(encoder_dim, decoder_dim, bias=False)
        elif method == 'concat':
            self.W = nn.Linear(encoder_dim + decoder_dim, decoder_dim)
            self.v = nn.Linear(decoder_dim, 1, bias=False)
        # 'dot' method needs no parameters
    
    def forward(
        self,
        encoder_outputs: torch.Tensor,
        decoder_hidden: torch.Tensor,
        mask: Optional[torch.Tensor] = None
    ):
        """
        Args:
            encoder_outputs: [batch_size, src_len, encoder_dim]
            decoder_hidden: [batch_size, decoder_dim]
        """
        if self.method == 'dot':
            # Requires encoder_dim == decoder_dim
            attention_scores = torch.bmm(
                encoder_outputs,
                decoder_hidden.unsqueeze(2)
            ).squeeze(2)
        
        elif self.method == 'general':
            # Project encoder outputs to decoder dimension
            energy = self.W(encoder_outputs)  # [batch, src_len, decoder_dim]
            attention_scores = torch.bmm(
                energy,
                decoder_hidden.unsqueeze(2)
            ).squeeze(2)
        
        elif self.method == 'concat':
            src_len = encoder_outputs.size(1)
            decoder_expanded = decoder_hidden.unsqueeze(1).repeat(1, src_len, 1)
            concat = torch.cat([encoder_outputs, decoder_expanded], dim=2)
            energy = torch.tanh(self.W(concat))
            attention_scores = self.v(energy).squeeze(2)
        
        # Apply mask and normalize
        if mask is not None:
            attention_scores = attention_scores.masked_fill(mask, float('-inf'))
        
        attention_weights = F.softmax(attention_scores, dim=1)
        context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs).squeeze(1)
        
        return context, attention_weights


# Compare attention methods
print("Attention Method Comparison")
print("="*50)

for method in ['dot', 'general', 'concat']:
    if method == 'dot':
        attn = LuongAttention(512, 512, method)  # Same dimensions for dot
        enc_out = torch.randn(32, 20, 512)
        dec_hidden = torch.randn(32, 512)
    else:
        attn = LuongAttention(1024, 512, method)
        enc_out = torch.randn(32, 20, 1024)
        dec_hidden = torch.randn(32, 512)
    
    context, weights = attn(enc_out, dec_hidden)
    params = sum(p.numel() for p in attn.parameters())
    print(f"{method}: {params:,} parameters")

Attention Decoder

class AttentionDecoder(nn.Module):
    """Decoder with attention mechanism."""
    
    def __init__(
        self,
        vocab_size: int,
        embed_dim: int,
        encoder_dim: int,
        hidden_dim: int,
        attention: nn.Module,
        num_layers: int = 2,
        dropout: float = 0.1
    ):
        super().__init__()
        
        self.vocab_size = vocab_size
        self.attention = attention
        
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.dropout = nn.Dropout(dropout)
        
        # LSTM input: embedding + context
        self.lstm = nn.LSTM(
            embed_dim + encoder_dim,
            hidden_dim,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0,
            batch_first=True
        )
        
        # Output: hidden + context
        self.fc_out = nn.Linear(hidden_dim + encoder_dim, vocab_size)
    
    def forward(
        self,
        tgt: torch.Tensor,
        hidden: Tuple[torch.Tensor, torch.Tensor],
        encoder_outputs: torch.Tensor,
        mask: Optional[torch.Tensor] = None
    ):
        """
        Args:
            tgt: [batch_size, 1]
            hidden: (h, c)
            encoder_outputs: [batch_size, src_len, encoder_dim]
            mask: [batch_size, src_len]
        """
        # Embed input
        embedded = self.dropout(self.embedding(tgt))  # [batch, 1, embed_dim]
        
        # Compute attention (use last layer hidden state)
        h_t = hidden[0][-1]  # [batch, hidden_dim]
        context, attention_weights = self.attention(encoder_outputs, h_t, mask)
        
        # Concatenate embedding with context
        lstm_input = torch.cat([embedded, context.unsqueeze(1)], dim=2)
        
        # Forward through LSTM
        output, hidden = self.lstm(lstm_input, hidden)
        output = output.squeeze(1)  # [batch, hidden_dim]
        
        # Concatenate LSTM output with context for prediction
        combined = torch.cat([output, context], dim=1)
        prediction = self.fc_out(combined)
        
        return prediction, hidden, attention_weights

Beam Search Decoding

Greedy decoding can miss better sequences. Beam search explores multiple hypotheses:
class BeamSearchDecoder:
    """Beam search for sequence generation."""
    
    def __init__(
        self,
        model: nn.Module,
        beam_size: int = 5,
        max_len: int = 50,
        length_penalty: float = 0.6,
        sos_token: int = 1,
        eos_token: int = 2
    ):
        self.model = model
        self.beam_size = beam_size
        self.max_len = max_len
        self.length_penalty = length_penalty
        self.sos_token = sos_token
        self.eos_token = eos_token
    
    def decode(
        self,
        src: torch.Tensor,
        src_lengths: torch.Tensor
    ) -> List[Tuple[torch.Tensor, float]]:
        """
        Perform beam search decoding.
        
        Returns:
            List of (sequence, score) tuples for each input
        """
        device = src.device
        batch_size = src.size(0)
        
        # Encode source
        encoder_outputs, (hidden, cell) = self.model.encoder(src, src_lengths)
        
        # Process each example in batch separately
        results = []
        
        for b in range(batch_size):
            # Get encoder outputs for this example
            enc_out_b = encoder_outputs[b:b+1]  # [1, src_len, hidden*2]
            h_b = hidden[:, b:b+1, :].contiguous()  # [layers, 1, hidden]
            c_b = cell[:, b:b+1, :].contiguous()
            
            # Initialize beams
            beams = [(
                [self.sos_token],  # tokens
                (h_b, c_b),        # hidden state
                0.0                 # log probability
            )]
            
            completed = []
            
            for step in range(self.max_len):
                all_candidates = []
                
                for tokens, (h, c), score in beams:
                    if tokens[-1] == self.eos_token:
                        completed.append((tokens, score))
                        continue
                    
                    # Get last token
                    last_token = torch.tensor([[tokens[-1]]], device=device)
                    
                    # Decode one step
                    output, (new_h, new_c) = self.model.decoder(
                        last_token, (h, c), enc_out_b
                    )
                    
                    # Get top-k next tokens
                    log_probs = F.log_softmax(output, dim=1)
                    top_probs, top_indices = log_probs.topk(self.beam_size)
                    
                    for i in range(self.beam_size):
                        new_tokens = tokens + [top_indices[0, i].item()]
                        new_score = score + top_probs[0, i].item()
                        all_candidates.append((
                            new_tokens,
                            (new_h, new_c),
                            new_score
                        ))
                
                if not all_candidates:
                    break
                
                # Select top-k beams with length normalization
                all_candidates.sort(
                    key=lambda x: x[2] / (len(x[0]) ** self.length_penalty),
                    reverse=True
                )
                beams = all_candidates[:self.beam_size]
            
            # Add remaining beams to completed
            completed.extend([(tokens, score) for tokens, _, score in beams])
            
            # Sort by normalized score
            completed.sort(
                key=lambda x: x[1] / (len(x[0]) ** self.length_penalty),
                reverse=True
            )
            
            # Return best sequence
            if completed:
                best_tokens, best_score = completed[0]
                results.append((torch.tensor(best_tokens), best_score))
            else:
                results.append((torch.tensor([self.sos_token]), 0.0))
        
        return results


# Example usage
beam_decoder = BeamSearchDecoder(
    model=model,
    beam_size=5,
    max_len=50,
    length_penalty=0.6
)
class DiverseBeamSearch(BeamSearchDecoder):
    """Beam search with diversity penalty."""
    
    def __init__(
        self,
        model: nn.Module,
        beam_size: int = 5,
        num_groups: int = 5,
        diversity_penalty: float = 0.5,
        **kwargs
    ):
        super().__init__(model, beam_size, **kwargs)
        self.num_groups = num_groups
        self.diversity_penalty = diversity_penalty
    
    def decode(self, src, src_lengths):
        """
        Diverse beam search produces multiple diverse outputs.
        """
        # Split beams into groups
        beams_per_group = self.beam_size // self.num_groups
        
        all_results = []
        previous_tokens = set()
        
        for group in range(self.num_groups):
            # Standard beam search with diversity penalty
            results = self._beam_search_with_penalty(
                src, src_lengths, previous_tokens
            )
            
            # Add to previous tokens for diversity
            for tokens, _ in results:
                previous_tokens.update(set(tokens.tolist()))
            
            all_results.extend(results)
        
        return all_results
    
    def _beam_search_with_penalty(self, src, src_lengths, previous_tokens):
        # Implementation similar to base beam search
        # but with penalty for tokens in previous_tokens
        pass

Training Techniques

Label Smoothing

class LabelSmoothingLoss(nn.Module):
    """Cross-entropy with label smoothing."""
    
    def __init__(self, vocab_size: int, smoothing: float = 0.1, padding_idx: int = 0):
        super().__init__()
        self.vocab_size = vocab_size
        self.smoothing = smoothing
        self.padding_idx = padding_idx
        self.confidence = 1.0 - smoothing
    
    def forward(self, predictions: torch.Tensor, targets: torch.Tensor):
        """
        Args:
            predictions: [batch*seq_len, vocab_size]
            targets: [batch*seq_len]
        """
        # Create smoothed target distribution
        with torch.no_grad():
            true_dist = torch.zeros_like(predictions)
            true_dist.fill_(self.smoothing / (self.vocab_size - 2))  # -2 for padding and target
            true_dist.scatter_(1, targets.unsqueeze(1), self.confidence)
            true_dist[:, self.padding_idx] = 0
            
            # Zero out padding positions
            mask = (targets == self.padding_idx)
            true_dist[mask] = 0
        
        # KL divergence
        log_probs = F.log_softmax(predictions, dim=-1)
        loss = -(true_dist * log_probs).sum(dim=-1)
        
        # Average over non-padding positions
        non_pad = (~mask).float()
        loss = (loss * non_pad).sum() / non_pad.sum()
        
        return loss


# Test
criterion = LabelSmoothingLoss(vocab_size=10000, smoothing=0.1)
pred = torch.randn(32 * 20, 10000)
target = torch.randint(0, 10000, (32 * 20,))
loss = criterion(pred, target)
print(f"Label smoothing loss: {loss.item():.4f}")

Scheduled Sampling

Gradually reduce teacher forcing during training:
class ScheduledSamplingTrainer:
    """Trainer with scheduled sampling."""
    
    def __init__(
        self,
        model: nn.Module,
        optimizer: torch.optim.Optimizer,
        criterion: nn.Module,
        initial_tf_ratio: float = 1.0,
        decay_rate: float = 0.99,
        min_tf_ratio: float = 0.1
    ):
        self.model = model
        self.optimizer = optimizer
        self.criterion = criterion
        
        self.tf_ratio = initial_tf_ratio
        self.decay_rate = decay_rate
        self.min_tf_ratio = min_tf_ratio
    
    def train_step(self, src, src_lengths, tgt):
        self.model.train()
        self.optimizer.zero_grad()
        
        # Forward with current teacher forcing ratio
        outputs = self.model(src, src_lengths, tgt, self.tf_ratio)
        
        # Compute loss (ignore first token which is SOS)
        outputs = outputs[:, 1:].reshape(-1, outputs.size(-1))
        targets = tgt[:, 1:].reshape(-1)
        
        loss = self.criterion(outputs, targets)
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
        
        self.optimizer.step()
        
        return loss.item()
    
    def update_tf_ratio(self):
        """Decay teacher forcing ratio after each epoch."""
        self.tf_ratio = max(
            self.min_tf_ratio,
            self.tf_ratio * self.decay_rate
        )
        return self.tf_ratio

Practical Applications

Machine Translation

class TranslationModel:
    """Complete translation pipeline."""
    
    def __init__(
        self,
        model: Seq2Seq,
        src_tokenizer,
        tgt_tokenizer,
        beam_size: int = 5
    ):
        self.model = model
        self.src_tokenizer = src_tokenizer
        self.tgt_tokenizer = tgt_tokenizer
        self.beam_decoder = BeamSearchDecoder(model, beam_size=beam_size)
    
    def translate(self, text: str) -> str:
        """Translate a single sentence."""
        
        self.model.eval()
        
        # Tokenize source
        src_tokens = self.src_tokenizer.encode(text)
        src = torch.tensor([src_tokens]).to(self.model.device)
        src_length = torch.tensor([len(src_tokens)])
        
        with torch.no_grad():
            results = self.beam_decoder.decode(src, src_length)
        
        # Decode best result
        best_tokens, score = results[0]
        translation = self.tgt_tokenizer.decode(best_tokens.tolist())
        
        return translation
    
    def translate_batch(self, texts: List[str]) -> List[str]:
        """Translate multiple sentences."""
        
        self.model.eval()
        
        # Tokenize and pad
        src_tokens = [self.src_tokenizer.encode(t) for t in texts]
        max_len = max(len(t) for t in src_tokens)
        
        src = torch.zeros(len(texts), max_len, dtype=torch.long)
        src_lengths = torch.zeros(len(texts), dtype=torch.long)
        
        for i, tokens in enumerate(src_tokens):
            src[i, :len(tokens)] = torch.tensor(tokens)
            src_lengths[i] = len(tokens)
        
        src = src.to(self.model.device)
        
        with torch.no_grad():
            results = self.beam_decoder.decode(src, src_lengths)
        
        translations = []
        for tokens, score in results:
            translation = self.tgt_tokenizer.decode(tokens.tolist())
            translations.append(translation)
        
        return translations

Text Summarization

class SummarizationModel:
    """Abstractive summarization with copy mechanism."""
    
    def __init__(self, model, tokenizer, max_summary_len=100):
        self.model = model
        self.tokenizer = tokenizer
        self.max_summary_len = max_summary_len
    
    def summarize(
        self,
        text: str,
        min_length: int = 30,
        max_length: int = 100,
        num_beams: int = 4,
        no_repeat_ngram_size: int = 3
    ) -> str:
        """Generate summary with constraints."""
        
        # Tokenize
        src_tokens = self.tokenizer.encode(text)
        src = torch.tensor([src_tokens]).to(self.model.device)
        src_length = torch.tensor([len(src_tokens)])
        
        # Beam search with constraints
        beam_decoder = BeamSearchDecoder(
            self.model,
            beam_size=num_beams,
            max_len=max_length
        )
        
        with torch.no_grad():
            results = beam_decoder.decode(src, src_length)
        
        # Filter by minimum length
        valid_results = [
            (tokens, score) for tokens, score in results
            if len(tokens) >= min_length
        ]
        
        if not valid_results:
            valid_results = results
        
        best_tokens, _ = valid_results[0]
        summary = self.tokenizer.decode(best_tokens.tolist())
        
        return summary

Evaluation Metrics

class Seq2SeqMetrics:
    """Evaluation metrics for seq2seq models."""
    
    @staticmethod
    def bleu_score(predictions: List[str], references: List[List[str]]) -> dict:
        """
        Compute BLEU score.
        
        Args:
            predictions: List of predicted sentences
            references: List of reference lists (multiple refs per prediction)
        """
        from collections import Counter
        
        def ngram_counts(tokens, n):
            return Counter(tuple(tokens[i:i+n]) for i in range(len(tokens) - n + 1))
        
        def modified_precision(pred, refs, n):
            pred_counts = ngram_counts(pred.split(), n)
            max_ref_counts = Counter()
            
            for ref in refs:
                ref_counts = ngram_counts(ref.split(), n)
                for ngram, count in ref_counts.items():
                    max_ref_counts[ngram] = max(max_ref_counts[ngram], count)
            
            clipped = sum(min(count, max_ref_counts[ngram]) 
                         for ngram, count in pred_counts.items())
            total = sum(pred_counts.values())
            
            return clipped / total if total > 0 else 0
        
        # Compute precision for n=1,2,3,4
        precisions = []
        for n in range(1, 5):
            p = np.mean([modified_precision(pred, refs, n) 
                        for pred, refs in zip(predictions, references)])
            precisions.append(p)
        
        # Brevity penalty
        pred_len = sum(len(p.split()) for p in predictions)
        ref_len = sum(min(len(r.split()) for r in refs) for refs in references)
        bp = np.exp(1 - ref_len / pred_len) if pred_len < ref_len else 1.0
        
        # Geometric mean
        log_precisions = [np.log(p + 1e-10) for p in precisions]
        bleu = bp * np.exp(sum(log_precisions) / 4)
        
        return {
            'bleu': bleu * 100,
            'bleu-1': precisions[0] * 100,
            'bleu-2': precisions[1] * 100,
            'bleu-3': precisions[2] * 100,
            'bleu-4': precisions[3] * 100,
            'brevity_penalty': bp
        }
    
    @staticmethod
    def rouge_score(predictions: List[str], references: List[str]) -> dict:
        """Compute ROUGE-L score."""
        
        def lcs(x, y):
            """Longest common subsequence."""
            m, n = len(x), len(y)
            dp = [[0] * (n + 1) for _ in range(m + 1)]
            
            for i in range(1, m + 1):
                for j in range(1, n + 1):
                    if x[i-1] == y[j-1]:
                        dp[i][j] = dp[i-1][j-1] + 1
                    else:
                        dp[i][j] = max(dp[i-1][j], dp[i][j-1])
            
            return dp[m][n]
        
        rouge_scores = []
        
        for pred, ref in zip(predictions, references):
            pred_tokens = pred.split()
            ref_tokens = ref.split()
            
            lcs_len = lcs(pred_tokens, ref_tokens)
            
            precision = lcs_len / len(pred_tokens) if pred_tokens else 0
            recall = lcs_len / len(ref_tokens) if ref_tokens else 0
            f1 = 2 * precision * recall / (precision + recall + 1e-10)
            
            rouge_scores.append(f1)
        
        return {'rouge-l': np.mean(rouge_scores) * 100}


# Example
metrics = Seq2SeqMetrics()

predictions = ["the cat sat on the mat", "hello world"]
references = [["the cat is on the mat", "a cat sat on a mat"], ["hello world", "hi world"]]

bleu = metrics.bleu_score(predictions, references)
print(f"BLEU Score: {bleu}")

rouge = metrics.rouge_score(predictions, [refs[0] for refs in references])
print(f"ROUGE-L: {rouge}")

Exercises

Add pointer-generator network to copy words from source:
class CopyMechanism(nn.Module):
    def forward(self, context, decoder_state, source):
        # Compute copy probability
        p_copy = self.sigmoid(self.W_copy([context, decoder_state]))
        
        # Blend vocabulary and copy distributions
        p_final = (1 - p_copy) * p_vocab + p_copy * p_copy_dist
Prevent repetition by tracking attention history:
class CoverageAttention(nn.Module):
    def forward(self, encoder_outputs, decoder_hidden, coverage):
        # coverage: sum of attention weights from previous steps
        coverage_penalty = self.W_coverage(coverage)
        attention = standard_attention + coverage_penalty
Top-p sampling for more diverse generation:
def nucleus_sampling(logits, p=0.9, temperature=1.0):
    probs = F.softmax(logits / temperature, dim=-1)
    sorted_probs, sorted_indices = torch.sort(probs, descending=True)
    cumsum = torch.cumsum(sorted_probs, dim=-1)
    
    # Find nucleus
    nucleus = cumsum < p
    # Sample from nucleus

What’s Next?