Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

Sequence-to-Sequence

Sequence-to-Sequence: Transforming Sequences

The Seq2Seq Paradigm

Seq2Seq models transform one sequence into another. The core insight: decompose the problem into two stages — first understand the input (encoding), then generate the output (decoding). This mirrors how a human translator works: you read the entire French sentence, build a mental model of its meaning, then produce the English equivalent word by word. This paradigm is remarkably general. Any problem where you have a variable-length input and need a variable-length output can be cast as seq2seq:
  • Machine Translation: “Hello world” -> “Bonjour monde”
  • Summarization: Long document -> Short summary
  • Question Answering: Context + Question -> Answer
  • Code Generation: Natural language description -> Working code
  • Speech Recognition: Audio waveform -> Text transcript
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
from typing import List, Tuple, Optional

torch.manual_seed(42)

Basic Encoder-Decoder Architecture

The Encoder

The encoder reads the input sequence and compresses it into a context vector — a fixed-size summary of the entire input. This is like reading an entire book and then trying to capture its essence in a single paragraph. The bidirectional LSTM below reads the input both forward and backward, giving it a complete picture of the context around each token.
class Encoder(nn.Module):
    """LSTM-based encoder for seq2seq."""
    
    def __init__(
        self,
        vocab_size: int,
        embed_dim: int,
        hidden_dim: int,
        num_layers: int = 2,
        dropout: float = 0.1
    ):
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.dropout = nn.Dropout(dropout)
        
        self.lstm = nn.LSTM(
            embed_dim,
            hidden_dim,
            num_layers=num_layers,
            bidirectional=True,
            dropout=dropout if num_layers > 1 else 0,
            batch_first=True
        )
        
        # Project bidirectional hidden state to decoder size
        self.fc_hidden = nn.Linear(hidden_dim * 2, hidden_dim)
        self.fc_cell = nn.Linear(hidden_dim * 2, hidden_dim)
    
    def forward(self, src: torch.Tensor, src_lengths: torch.Tensor):
        """
        Args:
            src: [batch_size, src_len] - source token ids
            src_lengths: [batch_size] - actual lengths (for packing)
        
        Returns:
            outputs: [batch_size, src_len, hidden_dim * 2]
            hidden: (h_n, c_n) each [num_layers, batch_size, hidden_dim]
        """
        # Embed and apply dropout
        embedded = self.dropout(self.embedding(src))
        
        # Pack for efficient computation
        packed = nn.utils.rnn.pack_padded_sequence(
            embedded, src_lengths.cpu(), 
            batch_first=True, enforce_sorted=False
        )
        
        # Forward through LSTM
        outputs, (hidden, cell) = self.lstm(packed)
        
        # Unpack
        outputs, _ = nn.utils.rnn.pad_packed_sequence(outputs, batch_first=True)
        
        # Combine bidirectional hidden states
        # hidden: [num_layers * 2, batch_size, hidden_dim]
        # Reshape: [num_layers, 2, batch_size, hidden_dim]
        num_layers = hidden.size(0) // 2
        batch_size = hidden.size(1)
        hidden_dim = hidden.size(2)
        
        hidden = hidden.view(num_layers, 2, batch_size, hidden_dim)
        cell = cell.view(num_layers, 2, batch_size, hidden_dim)
        
        # Concatenate forward and backward
        hidden = torch.cat([hidden[:, 0], hidden[:, 1]], dim=2)  # [num_layers, batch_size, hidden_dim * 2]
        cell = torch.cat([cell[:, 0], cell[:, 1]], dim=2)
        
        # Project to decoder size
        hidden = torch.tanh(self.fc_hidden(hidden))
        cell = torch.tanh(self.fc_cell(cell))
        
        return outputs, (hidden, cell)


# Test encoder
encoder = Encoder(vocab_size=10000, embed_dim=256, hidden_dim=512, num_layers=2)
src = torch.randint(0, 10000, (32, 20))  # batch_size=32, src_len=20
src_lengths = torch.randint(10, 20, (32,))

outputs, (hidden, cell) = encoder(src, src_lengths)
print(f"Encoder outputs: {outputs.shape}")  # [32, 20, 1024]
print(f"Hidden state: {hidden.shape}")  # [2, 32, 512]

The Decoder

The decoder generates the output sequence one token at a time:
class Decoder(nn.Module):
    """LSTM decoder with optional attention."""
    
    def __init__(
        self,
        vocab_size: int,
        embed_dim: int,
        hidden_dim: int,
        num_layers: int = 2,
        dropout: float = 0.1
    ):
        super().__init__()
        
        self.vocab_size = vocab_size
        self.hidden_dim = hidden_dim
        
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.dropout = nn.Dropout(dropout)
        
        self.lstm = nn.LSTM(
            embed_dim,
            hidden_dim,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0,
            batch_first=True
        )
        
        self.fc_out = nn.Linear(hidden_dim, vocab_size)
    
    def forward(
        self,
        tgt: torch.Tensor,
        hidden: Tuple[torch.Tensor, torch.Tensor],
        encoder_outputs: Optional[torch.Tensor] = None
    ):
        """
        Args:
            tgt: [batch_size, 1] - current target token
            hidden: (h, c) from previous step
            encoder_outputs: [batch_size, src_len, hidden*2] (optional, for attention)
        
        Returns:
            output: [batch_size, vocab_size] - logits
            hidden: updated hidden state
        """
        # Embed input token
        embedded = self.dropout(self.embedding(tgt))
        
        # Forward through LSTM
        output, hidden = self.lstm(embedded, hidden)
        
        # Project to vocabulary
        prediction = self.fc_out(output.squeeze(1))
        
        return prediction, hidden


# Test decoder
decoder = Decoder(vocab_size=10000, embed_dim=256, hidden_dim=512, num_layers=2)
tgt_token = torch.randint(0, 10000, (32, 1))  # Single token

prediction, new_hidden = decoder(tgt_token, (hidden, cell))
print(f"Prediction: {prediction.shape}")  # [32, 10000]

Complete Seq2Seq Model

class Seq2Seq(nn.Module):
    """Complete sequence-to-sequence model."""
    
    def __init__(
        self,
        encoder: Encoder,
        decoder: Decoder,
        device: torch.device
    ):
        super().__init__()
        
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
    
    def forward(
        self,
        src: torch.Tensor,
        src_lengths: torch.Tensor,
        tgt: torch.Tensor,
        teacher_forcing_ratio: float = 0.5
    ):
        """
        Args:
            src: [batch_size, src_len]
            src_lengths: [batch_size]
            tgt: [batch_size, tgt_len]
            teacher_forcing_ratio: probability of using ground truth
        
        Returns:
            outputs: [batch_size, tgt_len, vocab_size]
        """
        batch_size = src.size(0)
        tgt_len = tgt.size(1)
        vocab_size = self.decoder.vocab_size
        
        # Encode source sequence
        encoder_outputs, hidden = self.encoder(src, src_lengths)
        
        # Store decoder outputs
        outputs = torch.zeros(batch_size, tgt_len, vocab_size).to(self.device)
        
        # First input is <SOS> token
        input_token = tgt[:, 0:1]
        
        for t in range(1, tgt_len):
            # Decode one step
            output, hidden = self.decoder(input_token, hidden, encoder_outputs)
            outputs[:, t, :] = output
            
            # Teacher forcing
            use_teacher_forcing = random.random() < teacher_forcing_ratio
            
            if use_teacher_forcing:
                input_token = tgt[:, t:t+1]
            else:
                input_token = output.argmax(dim=1, keepdim=True)
        
        return outputs
    
    def generate(
        self,
        src: torch.Tensor,
        src_lengths: torch.Tensor,
        max_len: int = 50,
        sos_token: int = 1,
        eos_token: int = 2
    ) -> torch.Tensor:
        """Generate output sequence using greedy decoding."""
        
        batch_size = src.size(0)
        
        # Encode
        encoder_outputs, hidden = self.encoder(src, src_lengths)
        
        # Initialize with SOS
        input_token = torch.full((batch_size, 1), sos_token, device=self.device)
        
        generated = [input_token]
        
        for _ in range(max_len):
            output, hidden = self.decoder(input_token, hidden, encoder_outputs)
            input_token = output.argmax(dim=1, keepdim=True)
            generated.append(input_token)
            
            # Check if all sequences have generated EOS
            if (input_token == eos_token).all():
                break
        
        return torch.cat(generated, dim=1)


# Create full model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
encoder = Encoder(10000, 256, 512, 2).to(device)
decoder = Decoder(10000, 256, 512, 2).to(device)
model = Seq2Seq(encoder, decoder, device)

print(f"Total parameters: {sum(p.numel() for p in model.parameters()):,}")

Attention Mechanisms

The Information Bottleneck Problem

The basic encoder-decoder has a fatal flaw: it forces ALL information about the source sentence through a single fixed-size vector. Imagine trying to translate a 500-word paragraph, but you can only pass information through a single 512-dimensional vector. Long sequences inevitably lose information. Attention solves this elegantly by allowing the decoder to “look back” at every position in the source sequence at every step of generation. Instead of one summary vector, the decoder gets direct access to all encoder hidden states and learns to focus on the most relevant ones. When translating “le chat” to “the cat,” the decoder learns to attend strongly to “chat” when generating “cat.” This innovation, introduced by Bahdanau et al. in 2014, was so fundamental that it eventually evolved into the self-attention mechanism that powers modern Transformers.

Bahdanau Attention (Additive)

class BahdanauAttention(nn.Module):
    """Additive attention mechanism (Bahdanau et al., 2014).
    
    Computes attention scores by passing encoder and decoder states through
    a small neural network (one hidden layer + tanh). Called "additive" because
    encoder and decoder projections are summed before the tanh activation.
    
    Slightly more expressive than dot-product attention but also slower
    due to the extra parameters and non-linearity.
    """
    
    def __init__(self, encoder_dim: int, decoder_dim: int, attention_dim: int):
        super().__init__()
        
        # Project encoder and decoder states to a common dimensionality
        self.W_encoder = nn.Linear(encoder_dim, attention_dim)
        self.W_decoder = nn.Linear(decoder_dim, attention_dim)
        # Final projection to a scalar attention score per position
        self.v = nn.Linear(attention_dim, 1, bias=False)
    
    def forward(
        self,
        encoder_outputs: torch.Tensor,
        decoder_hidden: torch.Tensor,
        mask: Optional[torch.Tensor] = None
    ):
        """
        Args:
            encoder_outputs: [batch_size, src_len, encoder_dim]
            decoder_hidden: [batch_size, decoder_dim]
            mask: [batch_size, src_len] - True for padding positions
        
        Returns:
            context: [batch_size, encoder_dim]
            attention_weights: [batch_size, src_len]
        """
        src_len = encoder_outputs.size(1)
        
        # Project encoder outputs
        encoder_proj = self.W_encoder(encoder_outputs)  # [batch, src_len, attention_dim]
        
        # Project decoder hidden (repeat for each source position)
        decoder_proj = self.W_decoder(decoder_hidden).unsqueeze(1)  # [batch, 1, attention_dim]
        decoder_proj = decoder_proj.repeat(1, src_len, 1)  # [batch, src_len, attention_dim]
        
        # Compute attention scores
        energy = torch.tanh(encoder_proj + decoder_proj)  # [batch, src_len, attention_dim]
        attention_scores = self.v(energy).squeeze(-1)  # [batch, src_len]
        
        # Apply mask (set padding positions to -inf)
        if mask is not None:
            attention_scores = attention_scores.masked_fill(mask, float('-inf'))
        
        # Normalize with softmax
        attention_weights = F.softmax(attention_scores, dim=1)
        
        # Weighted sum of encoder outputs
        context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs)
        context = context.squeeze(1)  # [batch, encoder_dim]
        
        return context, attention_weights


# Test
attention = BahdanauAttention(1024, 512, 256)
encoder_outputs = torch.randn(32, 20, 1024)
decoder_hidden = torch.randn(32, 512)

context, weights = attention(encoder_outputs, decoder_hidden)
print(f"Context: {context.shape}")  # [32, 1024]
print(f"Attention weights: {weights.shape}")  # [32, 20]
print(f"Weights sum: {weights.sum(dim=1)}")  # Should be ~1.0

Luong Attention (Multiplicative)

class LuongAttention(nn.Module):
    """Multiplicative attention (faster than Bahdanau)."""
    
    def __init__(self, encoder_dim: int, decoder_dim: int, method: str = 'general'):
        super().__init__()
        
        self.method = method
        
        if method == 'general':
            self.W = nn.Linear(encoder_dim, decoder_dim, bias=False)
        elif method == 'concat':
            self.W = nn.Linear(encoder_dim + decoder_dim, decoder_dim)
            self.v = nn.Linear(decoder_dim, 1, bias=False)
        # 'dot' method needs no parameters
    
    def forward(
        self,
        encoder_outputs: torch.Tensor,
        decoder_hidden: torch.Tensor,
        mask: Optional[torch.Tensor] = None
    ):
        """
        Args:
            encoder_outputs: [batch_size, src_len, encoder_dim]
            decoder_hidden: [batch_size, decoder_dim]
        """
        if self.method == 'dot':
            # Requires encoder_dim == decoder_dim
            attention_scores = torch.bmm(
                encoder_outputs,
                decoder_hidden.unsqueeze(2)
            ).squeeze(2)
        
        elif self.method == 'general':
            # Project encoder outputs to decoder dimension
            energy = self.W(encoder_outputs)  # [batch, src_len, decoder_dim]
            attention_scores = torch.bmm(
                energy,
                decoder_hidden.unsqueeze(2)
            ).squeeze(2)
        
        elif self.method == 'concat':
            src_len = encoder_outputs.size(1)
            decoder_expanded = decoder_hidden.unsqueeze(1).repeat(1, src_len, 1)
            concat = torch.cat([encoder_outputs, decoder_expanded], dim=2)
            energy = torch.tanh(self.W(concat))
            attention_scores = self.v(energy).squeeze(2)
        
        # Apply mask and normalize
        if mask is not None:
            attention_scores = attention_scores.masked_fill(mask, float('-inf'))
        
        attention_weights = F.softmax(attention_scores, dim=1)
        context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs).squeeze(1)
        
        return context, attention_weights


# Compare attention methods
print("Attention Method Comparison")
print("="*50)

for method in ['dot', 'general', 'concat']:
    if method == 'dot':
        attn = LuongAttention(512, 512, method)  # Same dimensions for dot
        enc_out = torch.randn(32, 20, 512)
        dec_hidden = torch.randn(32, 512)
    else:
        attn = LuongAttention(1024, 512, method)
        enc_out = torch.randn(32, 20, 1024)
        dec_hidden = torch.randn(32, 512)
    
    context, weights = attn(enc_out, dec_hidden)
    params = sum(p.numel() for p in attn.parameters())
    print(f"{method}: {params:,} parameters")

Attention Decoder

class AttentionDecoder(nn.Module):
    """Decoder with attention mechanism."""
    
    def __init__(
        self,
        vocab_size: int,
        embed_dim: int,
        encoder_dim: int,
        hidden_dim: int,
        attention: nn.Module,
        num_layers: int = 2,
        dropout: float = 0.1
    ):
        super().__init__()
        
        self.vocab_size = vocab_size
        self.attention = attention
        
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.dropout = nn.Dropout(dropout)
        
        # LSTM input: embedding + context
        self.lstm = nn.LSTM(
            embed_dim + encoder_dim,
            hidden_dim,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0,
            batch_first=True
        )
        
        # Output: hidden + context
        self.fc_out = nn.Linear(hidden_dim + encoder_dim, vocab_size)
    
    def forward(
        self,
        tgt: torch.Tensor,
        hidden: Tuple[torch.Tensor, torch.Tensor],
        encoder_outputs: torch.Tensor,
        mask: Optional[torch.Tensor] = None
    ):
        """
        Args:
            tgt: [batch_size, 1]
            hidden: (h, c)
            encoder_outputs: [batch_size, src_len, encoder_dim]
            mask: [batch_size, src_len]
        """
        # Embed input
        embedded = self.dropout(self.embedding(tgt))  # [batch, 1, embed_dim]
        
        # Compute attention (use last layer hidden state)
        h_t = hidden[0][-1]  # [batch, hidden_dim]
        context, attention_weights = self.attention(encoder_outputs, h_t, mask)
        
        # Concatenate embedding with context
        lstm_input = torch.cat([embedded, context.unsqueeze(1)], dim=2)
        
        # Forward through LSTM
        output, hidden = self.lstm(lstm_input, hidden)
        output = output.squeeze(1)  # [batch, hidden_dim]
        
        # Concatenate LSTM output with context for prediction
        combined = torch.cat([output, context], dim=1)
        prediction = self.fc_out(combined)
        
        return prediction, hidden, attention_weights

Beam Search Decoding

Greedy decoding can miss better sequences. Beam search explores multiple hypotheses:
class BeamSearchDecoder:
    """Beam search for sequence generation."""
    
    def __init__(
        self,
        model: nn.Module,
        beam_size: int = 5,
        max_len: int = 50,
        length_penalty: float = 0.6,
        sos_token: int = 1,
        eos_token: int = 2
    ):
        self.model = model
        self.beam_size = beam_size
        self.max_len = max_len
        self.length_penalty = length_penalty
        self.sos_token = sos_token
        self.eos_token = eos_token
    
    def decode(
        self,
        src: torch.Tensor,
        src_lengths: torch.Tensor
    ) -> List[Tuple[torch.Tensor, float]]:
        """
        Perform beam search decoding.
        
        Returns:
            List of (sequence, score) tuples for each input
        """
        device = src.device
        batch_size = src.size(0)
        
        # Encode source
        encoder_outputs, (hidden, cell) = self.model.encoder(src, src_lengths)
        
        # Process each example in batch separately
        results = []
        
        for b in range(batch_size):
            # Get encoder outputs for this example
            enc_out_b = encoder_outputs[b:b+1]  # [1, src_len, hidden*2]
            h_b = hidden[:, b:b+1, :].contiguous()  # [layers, 1, hidden]
            c_b = cell[:, b:b+1, :].contiguous()
            
            # Initialize beams
            beams = [(
                [self.sos_token],  # tokens
                (h_b, c_b),        # hidden state
                0.0                 # log probability
            )]
            
            completed = []
            
            for step in range(self.max_len):
                all_candidates = []
                
                for tokens, (h, c), score in beams:
                    if tokens[-1] == self.eos_token:
                        completed.append((tokens, score))
                        continue
                    
                    # Get last token
                    last_token = torch.tensor([[tokens[-1]]], device=device)
                    
                    # Decode one step
                    output, (new_h, new_c) = self.model.decoder(
                        last_token, (h, c), enc_out_b
                    )
                    
                    # Get top-k next tokens
                    log_probs = F.log_softmax(output, dim=1)
                    top_probs, top_indices = log_probs.topk(self.beam_size)
                    
                    for i in range(self.beam_size):
                        new_tokens = tokens + [top_indices[0, i].item()]
                        new_score = score + top_probs[0, i].item()
                        all_candidates.append((
                            new_tokens,
                            (new_h, new_c),
                            new_score
                        ))
                
                if not all_candidates:
                    break
                
                # Select top-k beams with length normalization
                all_candidates.sort(
                    key=lambda x: x[2] / (len(x[0]) ** self.length_penalty),
                    reverse=True
                )
                beams = all_candidates[:self.beam_size]
            
            # Add remaining beams to completed
            completed.extend([(tokens, score) for tokens, _, score in beams])
            
            # Sort by normalized score
            completed.sort(
                key=lambda x: x[1] / (len(x[0]) ** self.length_penalty),
                reverse=True
            )
            
            # Return best sequence
            if completed:
                best_tokens, best_score = completed[0]
                results.append((torch.tensor(best_tokens), best_score))
            else:
                results.append((torch.tensor([self.sos_token]), 0.0))
        
        return results


# Example usage
beam_decoder = BeamSearchDecoder(
    model=model,
    beam_size=5,
    max_len=50,
    length_penalty=0.6
)
class DiverseBeamSearch(BeamSearchDecoder):
    """Beam search with diversity penalty."""
    
    def __init__(
        self,
        model: nn.Module,
        beam_size: int = 5,
        num_groups: int = 5,
        diversity_penalty: float = 0.5,
        **kwargs
    ):
        super().__init__(model, beam_size, **kwargs)
        self.num_groups = num_groups
        self.diversity_penalty = diversity_penalty
    
    def decode(self, src, src_lengths):
        """
        Diverse beam search produces multiple diverse outputs.
        """
        # Split beams into groups
        beams_per_group = self.beam_size // self.num_groups
        
        all_results = []
        previous_tokens = set()
        
        for group in range(self.num_groups):
            # Standard beam search with diversity penalty
            results = self._beam_search_with_penalty(
                src, src_lengths, previous_tokens
            )
            
            # Add to previous tokens for diversity
            for tokens, _ in results:
                previous_tokens.update(set(tokens.tolist()))
            
            all_results.extend(results)
        
        return all_results
    
    def _beam_search_with_penalty(self, src, src_lengths, previous_tokens):
        # Implementation similar to base beam search
        # but with penalty for tokens in previous_tokens
        pass

Training Techniques

Label Smoothing

class LabelSmoothingLoss(nn.Module):
    """Cross-entropy with label smoothing."""
    
    def __init__(self, vocab_size: int, smoothing: float = 0.1, padding_idx: int = 0):
        super().__init__()
        self.vocab_size = vocab_size
        self.smoothing = smoothing
        self.padding_idx = padding_idx
        self.confidence = 1.0 - smoothing
    
    def forward(self, predictions: torch.Tensor, targets: torch.Tensor):
        """
        Args:
            predictions: [batch*seq_len, vocab_size]
            targets: [batch*seq_len]
        """
        # Create smoothed target distribution
        with torch.no_grad():
            true_dist = torch.zeros_like(predictions)
            true_dist.fill_(self.smoothing / (self.vocab_size - 2))  # -2 for padding and target
            true_dist.scatter_(1, targets.unsqueeze(1), self.confidence)
            true_dist[:, self.padding_idx] = 0
            
            # Zero out padding positions
            mask = (targets == self.padding_idx)
            true_dist[mask] = 0
        
        # KL divergence
        log_probs = F.log_softmax(predictions, dim=-1)
        loss = -(true_dist * log_probs).sum(dim=-1)
        
        # Average over non-padding positions
        non_pad = (~mask).float()
        loss = (loss * non_pad).sum() / non_pad.sum()
        
        return loss


# Test
criterion = LabelSmoothingLoss(vocab_size=10000, smoothing=0.1)
pred = torch.randn(32 * 20, 10000)
target = torch.randint(0, 10000, (32 * 20,))
loss = criterion(pred, target)
print(f"Label smoothing loss: {loss.item():.4f}")

Scheduled Sampling

Gradually reduce teacher forcing during training:
class ScheduledSamplingTrainer:
    """Trainer with scheduled sampling."""
    
    def __init__(
        self,
        model: nn.Module,
        optimizer: torch.optim.Optimizer,
        criterion: nn.Module,
        initial_tf_ratio: float = 1.0,
        decay_rate: float = 0.99,
        min_tf_ratio: float = 0.1
    ):
        self.model = model
        self.optimizer = optimizer
        self.criterion = criterion
        
        self.tf_ratio = initial_tf_ratio
        self.decay_rate = decay_rate
        self.min_tf_ratio = min_tf_ratio
    
    def train_step(self, src, src_lengths, tgt):
        self.model.train()
        self.optimizer.zero_grad()
        
        # Forward with current teacher forcing ratio
        outputs = self.model(src, src_lengths, tgt, self.tf_ratio)
        
        # Compute loss (ignore first token which is SOS)
        outputs = outputs[:, 1:].reshape(-1, outputs.size(-1))
        targets = tgt[:, 1:].reshape(-1)
        
        loss = self.criterion(outputs, targets)
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
        
        self.optimizer.step()
        
        return loss.item()
    
    def update_tf_ratio(self):
        """Decay teacher forcing ratio after each epoch."""
        self.tf_ratio = max(
            self.min_tf_ratio,
            self.tf_ratio * self.decay_rate
        )
        return self.tf_ratio

Practical Applications

Machine Translation

class TranslationModel:
    """Complete translation pipeline."""
    
    def __init__(
        self,
        model: Seq2Seq,
        src_tokenizer,
        tgt_tokenizer,
        beam_size: int = 5
    ):
        self.model = model
        self.src_tokenizer = src_tokenizer
        self.tgt_tokenizer = tgt_tokenizer
        self.beam_decoder = BeamSearchDecoder(model, beam_size=beam_size)
    
    def translate(self, text: str) -> str:
        """Translate a single sentence."""
        
        self.model.eval()
        
        # Tokenize source
        src_tokens = self.src_tokenizer.encode(text)
        src = torch.tensor([src_tokens]).to(self.model.device)
        src_length = torch.tensor([len(src_tokens)])
        
        with torch.no_grad():
            results = self.beam_decoder.decode(src, src_length)
        
        # Decode best result
        best_tokens, score = results[0]
        translation = self.tgt_tokenizer.decode(best_tokens.tolist())
        
        return translation
    
    def translate_batch(self, texts: List[str]) -> List[str]:
        """Translate multiple sentences."""
        
        self.model.eval()
        
        # Tokenize and pad
        src_tokens = [self.src_tokenizer.encode(t) for t in texts]
        max_len = max(len(t) for t in src_tokens)
        
        src = torch.zeros(len(texts), max_len, dtype=torch.long)
        src_lengths = torch.zeros(len(texts), dtype=torch.long)
        
        for i, tokens in enumerate(src_tokens):
            src[i, :len(tokens)] = torch.tensor(tokens)
            src_lengths[i] = len(tokens)
        
        src = src.to(self.model.device)
        
        with torch.no_grad():
            results = self.beam_decoder.decode(src, src_lengths)
        
        translations = []
        for tokens, score in results:
            translation = self.tgt_tokenizer.decode(tokens.tolist())
            translations.append(translation)
        
        return translations

Text Summarization

class SummarizationModel:
    """Abstractive summarization with copy mechanism."""
    
    def __init__(self, model, tokenizer, max_summary_len=100):
        self.model = model
        self.tokenizer = tokenizer
        self.max_summary_len = max_summary_len
    
    def summarize(
        self,
        text: str,
        min_length: int = 30,
        max_length: int = 100,
        num_beams: int = 4,
        no_repeat_ngram_size: int = 3
    ) -> str:
        """Generate summary with constraints."""
        
        # Tokenize
        src_tokens = self.tokenizer.encode(text)
        src = torch.tensor([src_tokens]).to(self.model.device)
        src_length = torch.tensor([len(src_tokens)])
        
        # Beam search with constraints
        beam_decoder = BeamSearchDecoder(
            self.model,
            beam_size=num_beams,
            max_len=max_length
        )
        
        with torch.no_grad():
            results = beam_decoder.decode(src, src_length)
        
        # Filter by minimum length
        valid_results = [
            (tokens, score) for tokens, score in results
            if len(tokens) >= min_length
        ]
        
        if not valid_results:
            valid_results = results
        
        best_tokens, _ = valid_results[0]
        summary = self.tokenizer.decode(best_tokens.tolist())
        
        return summary

Evaluation Metrics

class Seq2SeqMetrics:
    """Evaluation metrics for seq2seq models."""
    
    @staticmethod
    def bleu_score(predictions: List[str], references: List[List[str]]) -> dict:
        """
        Compute BLEU score.
        
        Args:
            predictions: List of predicted sentences
            references: List of reference lists (multiple refs per prediction)
        """
        from collections import Counter
        
        def ngram_counts(tokens, n):
            return Counter(tuple(tokens[i:i+n]) for i in range(len(tokens) - n + 1))
        
        def modified_precision(pred, refs, n):
            pred_counts = ngram_counts(pred.split(), n)
            max_ref_counts = Counter()
            
            for ref in refs:
                ref_counts = ngram_counts(ref.split(), n)
                for ngram, count in ref_counts.items():
                    max_ref_counts[ngram] = max(max_ref_counts[ngram], count)
            
            clipped = sum(min(count, max_ref_counts[ngram]) 
                         for ngram, count in pred_counts.items())
            total = sum(pred_counts.values())
            
            return clipped / total if total > 0 else 0
        
        # Compute precision for n=1,2,3,4
        precisions = []
        for n in range(1, 5):
            p = np.mean([modified_precision(pred, refs, n) 
                        for pred, refs in zip(predictions, references)])
            precisions.append(p)
        
        # Brevity penalty
        pred_len = sum(len(p.split()) for p in predictions)
        ref_len = sum(min(len(r.split()) for r in refs) for refs in references)
        bp = np.exp(1 - ref_len / pred_len) if pred_len < ref_len else 1.0
        
        # Geometric mean
        log_precisions = [np.log(p + 1e-10) for p in precisions]
        bleu = bp * np.exp(sum(log_precisions) / 4)
        
        return {
            'bleu': bleu * 100,
            'bleu-1': precisions[0] * 100,
            'bleu-2': precisions[1] * 100,
            'bleu-3': precisions[2] * 100,
            'bleu-4': precisions[3] * 100,
            'brevity_penalty': bp
        }
    
    @staticmethod
    def rouge_score(predictions: List[str], references: List[str]) -> dict:
        """Compute ROUGE-L score."""
        
        def lcs(x, y):
            """Longest common subsequence."""
            m, n = len(x), len(y)
            dp = [[0] * (n + 1) for _ in range(m + 1)]
            
            for i in range(1, m + 1):
                for j in range(1, n + 1):
                    if x[i-1] == y[j-1]:
                        dp[i][j] = dp[i-1][j-1] + 1
                    else:
                        dp[i][j] = max(dp[i-1][j], dp[i][j-1])
            
            return dp[m][n]
        
        rouge_scores = []
        
        for pred, ref in zip(predictions, references):
            pred_tokens = pred.split()
            ref_tokens = ref.split()
            
            lcs_len = lcs(pred_tokens, ref_tokens)
            
            precision = lcs_len / len(pred_tokens) if pred_tokens else 0
            recall = lcs_len / len(ref_tokens) if ref_tokens else 0
            f1 = 2 * precision * recall / (precision + recall + 1e-10)
            
            rouge_scores.append(f1)
        
        return {'rouge-l': np.mean(rouge_scores) * 100}


# Example
metrics = Seq2SeqMetrics()

predictions = ["the cat sat on the mat", "hello world"]
references = [["the cat is on the mat", "a cat sat on a mat"], ["hello world", "hi world"]]

bleu = metrics.bleu_score(predictions, references)
print(f"BLEU Score: {bleu}")

rouge = metrics.rouge_score(predictions, [refs[0] for refs in references])
print(f"ROUGE-L: {rouge}")

Exercises

Add pointer-generator network to copy words from source:
class CopyMechanism(nn.Module):
    def forward(self, context, decoder_state, source):
        # Compute copy probability
        p_copy = self.sigmoid(self.W_copy([context, decoder_state]))
        
        # Blend vocabulary and copy distributions
        p_final = (1 - p_copy) * p_vocab + p_copy * p_copy_dist
Prevent repetition by tracking attention history:
class CoverageAttention(nn.Module):
    def forward(self, encoder_outputs, decoder_hidden, coverage):
        # coverage: sum of attention weights from previous steps
        coverage_penalty = self.W_coverage(coverage)
        attention = standard_attention + coverage_penalty
Top-p sampling for more diverse generation:
def nucleus_sampling(logits, p=0.9, temperature=1.0):
    probs = F.softmax(logits / temperature, dim=-1)
    sorted_probs, sorted_indices = torch.sort(probs, descending=True)
    cumsum = torch.cumsum(sorted_probs, dim=-1)
    
    # Find nucleus
    nucleus = cumsum < p
    # Sample from nucleus

What’s Next?

Self-Supervised Learning

Learn representations without labels

Reinforcement Learning for DL

RLHF, PPO, and reward optimization

Interview Deep-Dive

Strong Answer:In a basic encoder-decoder, the encoder compresses the entire input sequence into a single fixed-size context vector — typically the last hidden state. This is like asking someone to summarize a 500-page book in a single sentence and then asking another person to reconstruct the book from that sentence. No matter how large you make that context vector, there is a fundamental information bottleneck: long sequences contain more information than any fixed-size vector can faithfully represent.The symptoms are clear in practice: translation quality degrades sharply for sentences beyond 20-30 tokens, and the model struggles with capturing long-range dependencies. The first few words of the input and the last few are represented well (recency and primacy effects in the RNN), but middle portions are effectively lost.Attention (Bahdanau et al., 2014) eliminates the bottleneck by allowing the decoder to “look back” at every encoder hidden state at every decoding step. Instead of one context vector, the decoder gets a weighted combination of all encoder hidden states, with the weights learned dynamically based on what the decoder needs at each step. When generating the verb in a translation, the attention weights concentrate on the source verb. When generating a noun, they shift to the source noun.Mathematically, at each decoding step t, attention computes: context_t = sum(alpha_ * h_i) where alpha_ = softmax(score(s_t, h_i)). The score function can be dot-product, additive (Bahdanau), or multiplicative (Luong). The context vector is now different at every step, carrying exactly the information the decoder needs.The scaling implication: this is also the foundation for the Transformer’s self-attention, which generalizes the idea by allowing every position to attend to every other position, not just decoder-to-encoder. The conceptual leap from seq2seq attention to Transformer self-attention is smaller than it appears — it is the same attention mechanism applied bidirectionally within a single sequence.Follow-up: Why did the Transformer architecture completely replace LSTM-based seq2seq models? What did LSTMs do better?The Transformer won on two fronts: parallelism and long-range dependencies. An LSTM processes tokens sequentially — token t+1 depends on the hidden state from token t. This means you cannot parallelize across time steps during training. Transformers compute all positions simultaneously via matrix multiplications, achieving massive speedups on GPUs. For a sequence of length N, LSTM training is O(N) sequential steps; Transformer training is O(1) sequential steps (though each step is O(N^2) in compute).For long-range dependencies, LSTMs must propagate information through N sequential cells, each with a gating mechanism that can leak or distort the signal. Transformers connect any two positions with a single attention step, regardless of distance.What LSTMs did better: memory efficiency for long sequences. Transformer self-attention is O(N^2) in memory, while LSTMs are O(N). For very long sequences (100K+ tokens), this matters. Also, LSTMs have an inductive bias toward sequential processing that makes them data-efficient for small sequence modeling tasks. You rarely hear about this advantage because most modern tasks have enough data that the Transformer’s flexibility outweighs the LSTM’s inductive bias.
Strong Answer:Greedy decoding selects the highest-probability token at each step. It is fast (one forward pass per token) and deterministic, but it often produces suboptimal sequences because it cannot recover from a locally good but globally poor choice. It is like navigating a maze by always turning toward the exit — you get stuck in dead ends.Beam search maintains the top-k (beam width) partial sequences at each step, expanding all of them and keeping the best k. It approximates a search over the full output space without the exponential cost. Beam width of 4-5 is standard for machine translation, where there is usually one “correct” output. The trade-off is k-fold more compute per token.Nucleus (top-p) sampling restricts random sampling to the smallest set of tokens whose cumulative probability exceeds p (typically 0.9-0.95), then samples from that set. This balances diversity with quality by excluding the long tail of unlikely tokens.When to use each in production: beam search for tasks with “correct” answers where quality matters (translation, summarization, code generation) — it reliably finds high-probability outputs. Nucleus sampling for creative or conversational tasks (chatbots, story generation) where diversity and naturalness matter more than finding the single best sequence. Greedy decoding for latency-critical applications where quality can be slightly sacrificed, or as a baseline to validate that more expensive decoding actually helps.A production nuance people miss: beam search can produce degenerate outputs (repetitive text, empty sequences) because high-probability sequences are often boring or repetitive. Production systems add length normalization (divide sequence log-probability by length to avoid favoring short outputs), repetition penalties (reduce the probability of recently generated tokens), and sometimes a minimum length constraint.Follow-up: You are building a code completion system. You notice beam search produces correct but boring completions while sampling produces creative but often incorrect code. How do you get the best of both?The standard approach is speculative decoding combined with rejection sampling. Generate N candidate completions using nucleus sampling (for diversity), then re-rank them using the model’s own log-probability (or a separate verifier model) and return the highest-scoring candidate. This gives you the diversity of sampling with the quality filtering of beam search.More sophisticated approaches: use a separate code execution model to filter candidates that do not compile or pass basic tests. At GitHub Copilot scale, they generate multiple candidates in parallel, score them with a fill-in-the-middle verifier, and present the best one. The key insight is that generation and verification are separate problems, and using different strategies for each outperforms using a single strategy for both.
Strong Answer:Teacher forcing feeds the ground-truth previous token as input to the decoder at each training step, rather than the decoder’s own prediction. If the correct output is “the cat sat,” at step 3 the decoder receives “the cat” as input regardless of what it predicted at steps 1 and 2. This makes training faster and more stable because the decoder always sees correct context, which provides clear gradient signals.The problem is exposure bias: during inference, there is no ground truth — the decoder feeds its own predictions back as input. If it makes an error at step 2, that error compounds at step 3 and beyond because the model was never trained on its own erroneous outputs. It is like a student who practices piano only by reading sheet music one note at a time but never plays a whole piece — they fall apart when they have to maintain coherence across measures.Mitigation strategies: (1) Scheduled sampling (Bengio et al., 2015) — during training, gradually increase the probability of using the model’s own prediction instead of the ground truth. Start with 100% teacher forcing and linearly decrease to 50% over training. This exposes the model to its own errors during training. (2) Sequence-level training with REINFORCE — optimize the actual sequence-level metric (BLEU, ROUGE) using policy gradient methods, which naturally trains the model on its own outputs. (3) For modern Transformer models, the problem is less severe because self-attention gives each token direct access to all previous tokens, making the model more robust to individual token errors.In practice, scheduled sampling provides the best effort-to-improvement ratio for LSTM-based seq2seq. For Transformer-based models, teacher forcing with proper regularization (label smoothing, dropout) works well enough that scheduled sampling is rarely used — the Transformer’s parallel attention mechanism makes it inherently more robust to exposure bias.Follow-up: How does label smoothing help with exposure bias in seq2seq models?Label smoothing replaces the hard target distribution (1.0 on the correct token, 0.0 on everything else) with a soft distribution (0.9 on the correct token, 0.1 distributed across all other tokens). This has two effects that reduce exposure bias. First, the model never learns to be 100% confident in any single token, which means at inference time its probability distribution is wider and more recovery-friendly — if it makes a wrong prediction, the context is less “broken” because the model was always somewhat uncertain. Second, it regularizes the model against overfitting to the exact training sequences, encouraging it to learn more general patterns that transfer better to the auto-regressive inference setting.Empirically, label smoothing of 0.1 gives 0.5-1.0 BLEU improvement on machine translation tasks. It is essentially free compute-wise and should be the default for any seq2seq model.